isbn-sort/isbn_sort/app.py
2024-05-01 19:57:01 +02:00

155 lines
4.1 KiB
Python

from flask import render_template, redirect, request, flash, Blueprint, current_app, url_for, Response
import json
from .book import Book
from . import isbn_db
from . import sse
bp = Blueprint("app", __name__, url_prefix="/app")
announcer = sse.MessageAnnouncer()
def announce_book(book, msg_type: str="add_book") -> None:
"""
msg_type must either be "update_book", "add_book" or "delete_book"
"""
msg = sse.format_sse({
"type": msg_type,
"book": book.to_json()
})
announcer.announce(msg)
@bp.route("/submit-isbn")
def submit_isbn():
if "isbn" not in request.args:
return "/submit-isbn?isbn=xxxxxx"
if request.args["isbn"] == "":
return "Pas d'ISBN spécifié"
try:
book = Book(request.args["isbn"])
except ValueError:
return "Pas le bon format de QR code"
try:
book = isbn_db.get_book(book.isbn)
assert isbn_db.add_book(book) == "duplicate" # duplicate
announce_book(isbn_db.get_book(book.isbn), msg_type="update_book")
return f"{book.title} ajouté (plusieurs occurrences)"
except IndexError:
pass
if "no-search" in request.args:
isbn_db.add_book(book)
announce_book(book)
return "À rajouter manuellement"
try:
book.load(loader="google_books", config=current_app.config)
print("Got ", book, "[Google]")
except KeyError:
try:
book.load(loader="openlibrary", config=current_app.config)
print("Got ", book, "[OpenLibrary]")
except KeyError:
isbn_db.add_book(book)
announce_book(book)
return "Pas trouvé, à rajouter manuellement"
isbn_db.add_book(book)
announce_book(book)
return f"{book.title} ajouté"
@bp.route("/web-submit-isbn")
def web_submit_isbn():
flash(submit_isbn())
return redirect(url_for("app.index"))
@bp.route("/")
def index():
return render_template(
"index.html",
books=isbn_db.get_all_books(),
categories=isbn_db.get_categories()
)
@bp.route("/delete-book", methods=["POST"])
def delete_book():
if "isbn" not in request.form:
return "missing isbn"
isbn_db.delete_book(request.form["isbn"])
msg = sse.format_sse({
"type": "delete_book",
"book": {
"isbn": request.form["isbn"]
}
})
announcer.announce(msg)
return redirect(url_for("app.index"))
@bp.route("/update-book", methods=["POST"])
def update_book():
attributes = ["isbn", "count", "title", "author", "publisher", "publish_date", "category"]
if True in [i not in request.form for i in attributes]:
return "missing an attribute"
form_data = request.form.copy()
for attribute in attributes:
if form_data[attribute] == "None":
form_data[attribute] = None
book = Book(form_data["isbn"])
book._manual_load(
form_data["title"],
publisher=form_data["publisher"],
publish_date=form_data["publish_date"],
author=form_data["author"],
count=int(form_data["count"]),
category=form_data["category"]
)
isbn_db.update_book(book)
announce_book(book, msg_type="update_book")
return redirect(url_for("app.index"))
@bp.route("/export-csv")
def export_csv():
books = isbn_db.get_all_books()
csv = "ISBN;Titre;Auteur;Éditeur;Date;Nombre\n"
for book in books:
csv += f"{book.isbn};{book.title};{book.author};{book.publisher};{book.publish_date};{book.count}\n"
# return as file with a good filename
return current_app.response_class(
csv,
mimetype="text/csv",
headers={"Content-Disposition": "attachment;filename=books.csv"}
)
@bp.route('/listen', methods=['GET'])
def listen():
def stream():
messages = announcer.listen() # returns a queue.Queue
while True:
msg = messages.get() # blocks until a new message arrives
yield msg
return Response(stream(), mimetype='text/event-stream')
@bp.route('/ping')
def ping():
msg = sse.format_sse(data={"type": "pong"})
announcer.announce(msg=msg)
return {}, 200