isbn-sort/isbn_sort/app.py

135 lines
3.5 KiB
Python
Raw Normal View History

2024-04-05 17:47:15 +02:00
from flask import render_template, redirect, request, flash, Blueprint, current_app, url_for, Response
2024-04-05 17:24:02 +02:00
import json
from .book import Book
from . import isbn_db
2024-04-05 17:47:15 +02:00
from . import sse
2024-04-05 17:24:02 +02:00
bp = Blueprint("app", __name__, url_prefix="/app")
2024-04-05 17:47:15 +02:00
announcer = sse.MessageAnnouncer()
def announce_book(book, msg_type: str="add_book") -> None:
"""
msg_type must either be "update_book", "add_book" or "delete_book"
"""
msg = sse.format_sse({
"type": msg_type,
"book": book.to_json()
})
announcer.announce(msg)
2024-04-05 17:24:02 +02:00
@bp.route("/submit-isbn")
def submit_isbn():
if "isbn" not in request.args:
return "/submit-isbn?isbn=xxxxxx"
if request.args["isbn"] == "":
return "Pas d'ISBN spécifié"
try:
book = Book(request.args["isbn"])
except ValueError:
return "Pas le bon format de QR code"
try:
book.load()
except KeyError:
isbn_db.add_book(book)
2024-04-05 17:47:15 +02:00
announce_book(book)
2024-04-05 17:24:02 +02:00
return "Pas trouvé, à rajouter manuellement"
print("Got ", book)
if isbn_db.add_book(book) == "duplicate":
2024-04-05 18:22:55 +02:00
announce_book(isbn_db.get_book(book.isbn), msg_type="update_book")
2024-04-05 17:24:02 +02:00
return f"{book.title} ajouté (plusieurs occurrences)"
2024-04-05 17:47:15 +02:00
announce_book(book)
2024-04-05 17:24:02 +02:00
return f"{book.title} ajouté"
2024-04-05 17:47:15 +02:00
2024-04-05 17:24:02 +02:00
@bp.route("/web-submit-isbn")
def web_submit_isbn():
flash(submit_isbn())
return redirect(url_for("app.index"))
2024-04-05 17:47:15 +02:00
2024-04-05 17:24:02 +02:00
@bp.route("/")
def index():
return render_template("index.html", books=isbn_db.get_all_books())
@bp.route("/delete-book", methods=["POST"])
def delete_book():
if "isbn" not in request.form:
return "missing isbn"
isbn_db.delete_book(request.form["isbn"])
2024-04-05 17:47:15 +02:00
msg = sse.format_sse({
"type": "delete_book",
"book": {
"isbn": request.form["isbn"]
}
})
announcer.announce(msg)
2024-04-05 17:24:02 +02:00
return redirect(url_for("app.index"))
2024-04-05 17:47:15 +02:00
2024-04-05 17:24:02 +02:00
@bp.route("/update-book", methods=["POST"])
def update_book():
attributes = ["isbn", "count", "title", "author", "publisher", "publish_date"]
if True in [i not in request.form for i in attributes]:
return "missing an attribute"
form_data = request.form.copy()
for attribute in attributes:
if form_data[attribute] == "None":
form_data[attribute] = None
book = Book(form_data["isbn"])
book._manual_load(
form_data["title"],
publisher=form_data["publisher"],
publish_date=form_data["publish_date"],
author=form_data["author"],
count=int(form_data["count"])
)
isbn_db.update_book(book)
2024-04-05 17:47:15 +02:00
announce_book(book, msg_type="update_book")
2024-04-05 17:24:02 +02:00
return redirect(url_for("app.index"))
2024-04-05 17:47:15 +02:00
2024-04-05 17:24:02 +02:00
@bp.route("/export-csv")
def export_csv():
books = isbn_db.get_all_books()
csv = "ISBN;Titre;Auteur;Éditeur;Date;Nombre\n"
for book in books:
csv += f"{book.isbn};{book.title};{book.author};{book.publisher};{book.publish_date};{book.count}\n"
# return as file with a good filename
return current_app.response_class(
csv,
mimetype="text/csv",
headers={"Content-Disposition": "attachment;filename=books.csv"}
2024-04-05 17:47:15 +02:00
)
@bp.route('/listen', methods=['GET'])
def listen():
def stream():
messages = announcer.listen() # returns a queue.Queue
while True:
msg = messages.get() # blocks until a new message arrives
yield msg
return Response(stream(), mimetype='text/event-stream')
@bp.route('/ping')
def ping():
msg = sse.format_sse(data={"type": "pong"})
announcer.announce(msg=msg)
return {}, 200