from flask import render_template, redirect, request, flash, Blueprint, current_app, url_for, Response import json from .book import Book from . import isbn_db from . import sse bp = Blueprint("app", __name__, url_prefix="/app") announcer = sse.MessageAnnouncer() def announce_book(book, msg_type: str="add_book") -> None: """ msg_type must either be "update_book", "add_book" or "delete_book" """ msg = sse.format_sse({ "type": msg_type, "book": book.to_json() }) announcer.announce(msg) @bp.route("/submit-isbn") def submit_isbn(): if "isbn" not in request.args: return "/submit-isbn?isbn=xxxxxx" if request.args["isbn"] == "": return "Pas d'ISBN spécifié" try: book = Book(request.args["isbn"]) except ValueError: return "Pas le bon format de QR code" try: book = isbn_db.get_book(book.isbn) assert isbn_db.add_book(book) == "duplicate" # duplicate announce_book(isbn_db.get_book(book.isbn), msg_type="update_book") return f"{book.title} ajouté (plusieurs occurrences)" except IndexError: pass try: book.load(loader="google_books", config=current_app.config) print("Got ", book, "[Google]") except KeyError: try: book.load(loader="openlibrary", config=current_app.config) print("Got ", book, "[OpenLibrary]") except KeyError: isbn_db.add_book(book) announce_book(book) return "Pas trouvé, à rajouter manuellement" isbn_db.add_book(book) announce_book(book) return f"{book.title} ajouté" @bp.route("/web-submit-isbn") def web_submit_isbn(): flash(submit_isbn()) return redirect(url_for("app.index")) @bp.route("/") def index(): return render_template("index.html", books=isbn_db.get_all_books()) @bp.route("/delete-book", methods=["POST"]) def delete_book(): if "isbn" not in request.form: return "missing isbn" isbn_db.delete_book(request.form["isbn"]) msg = sse.format_sse({ "type": "delete_book", "book": { "isbn": request.form["isbn"] } }) announcer.announce(msg) return redirect(url_for("app.index")) @bp.route("/update-book", methods=["POST"]) def update_book(): attributes = ["isbn", "count", "title", "author", "publisher", "publish_date"] if True in [i not in request.form for i in attributes]: return "missing an attribute" form_data = request.form.copy() for attribute in attributes: if form_data[attribute] == "None": form_data[attribute] = None book = Book(form_data["isbn"]) book._manual_load( form_data["title"], publisher=form_data["publisher"], publish_date=form_data["publish_date"], author=form_data["author"], count=int(form_data["count"]) ) isbn_db.update_book(book) announce_book(book, msg_type="update_book") return redirect(url_for("app.index")) @bp.route("/export-csv") def export_csv(): books = isbn_db.get_all_books() csv = "ISBN;Titre;Auteur;Éditeur;Date;Nombre\n" for book in books: csv += f"{book.isbn};{book.title};{book.author};{book.publisher};{book.publish_date};{book.count}\n" # return as file with a good filename return current_app.response_class( csv, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=books.csv"} ) @bp.route('/listen', methods=['GET']) def listen(): def stream(): messages = announcer.listen() # returns a queue.Queue while True: msg = messages.get() # blocks until a new message arrives yield msg return Response(stream(), mimetype='text/event-stream') @bp.route('/ping') def ping(): msg = sse.format_sse(data={"type": "pong"}) announcer.announce(msg=msg) return {}, 200