Factorise duplicated code

This commit is contained in:
augustin64 2024-01-15 18:53:57 +01:00
parent d54419fd35
commit f43b1e1090
7 changed files with 93 additions and 96 deletions

View File

@ -5,14 +5,16 @@ Albums module
import os import os
import shutil import shutil
from uuid import uuid4 from uuid import uuid4
from typing import TypeVar
from flask import (Blueprint, abort, flash, redirect, render_template, from flask import (Blueprint, abort, flash, redirect, render_template,
request, session, current_app) request, session, current_app)
from .auth import login_required from .auth import login_required
from .db import get_db from .db import get_db
from .utils import User, Album, get_all_partitions, new_uuid, get_qrcode, format_uuid from .utils import User, Album
from . import search from . import search, utils
bp = Blueprint("albums", __name__, url_prefix="/albums") bp = Blueprint("albums", __name__, url_prefix="/albums")
@ -41,7 +43,7 @@ def search_page():
query = request.form["query"] query = request.form["query"]
nb_queries = abs(int(request.form["nb-queries"])) nb_queries = abs(int(request.form["nb-queries"]))
search.flush_cache() search.flush_cache()
partitions_local = search.local_search(query, get_all_partitions()) partitions_local = search.local_search(query, utils.get_all_partitions())
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
@ -73,8 +75,8 @@ def get_album(uuid):
album = Album(uuid=uuid) album = Album(uuid=uuid)
except LookupError: except LookupError:
try: try:
album = Album(uuid=format_uuid(uuid)) album = Album(uuid=utils.format_uuid(uuid))
return redirect(f"/albums/{format_uuid(uuid)}") return redirect(f"/albums/{utils.format_uuid(uuid)}")
except LookupError: except LookupError:
return abort(404) return abort(404)
@ -101,12 +103,12 @@ def qr_code(uuid):
""" """
Renvoie le QR Code d'un album Renvoie le QR Code d'un album
""" """
return get_qrcode(f"/albums/{uuid}") return utils.get_qrcode(f"/albums/{uuid}")
@bp.route("/create-album", methods=["POST"]) @bp.route("/create-album", methods=["POST"])
@login_required @login_required
def create_album(): def create_album_req():
""" """
Création d'un album Création d'un album
""" """
@ -118,31 +120,16 @@ def create_album():
error = "Un nom est requis. L'album n'a pas été créé" error = "Un nom est requis. L'album n'a pas été créé"
if error is None: if error is None:
while True: uuid = utils.create_album(name)
try: album = Album(uuid=uuid)
uuid = new_uuid() db.execute(
"""
db.execute( INSERT INTO contient_user (user_id, album_id)
""" VALUES (?, ?)
INSERT INTO album (uuid, name) """,
VALUES (?, ?) (session.get("user_id"), album.id),
""", )
(uuid, name), db.commit()
)
db.commit()
album = Album(uuid=uuid)
db.execute(
"""
INSERT INTO contient_user (user_id, album_id)
VALUES (?, ?)
""",
(session.get("user_id"), album.id),
)
db.commit()
break
except db.IntegrityError:
pass
if "response" in request.args and request.args["response"] == "json": if "response" in request.args and request.args["response"] == "json":
return { return {
@ -232,6 +219,13 @@ def add_partition(album_uuid):
""" """
Ajouter une partition à un album (par upload) Ajouter une partition à un album (par upload)
""" """
T = TypeVar("T")
def get_opt_string(dictionary: dict[T, str], key: T):
"""Renvoie '' si la clé n'existe pas dans le dictionnaire"""
if key in dictionary:
return dictionary[key]
return ""
db = get_db() db = get_db()
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
album = Album(uuid=album_uuid) album = Album(uuid=album_uuid)
@ -245,23 +239,22 @@ def add_partition(album_uuid):
if "name" not in request.form: if "name" not in request.form:
error = "Un titre est requis." error = "Un titre est requis."
elif "file" not in request.files and "partition-uuid" not in request.form:
error = "Aucun fichier n'a été fourni."
elif "file" not in request.files: elif "file" not in request.files:
if "partition-uuid" not in request.form: partition_type = "uuid"
error = "Aucun fichier n'a été fourni." search_uuid = request.form["partition-uuid"]
data = db.execute(
"""
SELECT * FROM search_results
WHERE uuid = ?
""",
(search_uuid,)
).fetchone()
if data is None:
error = "Les résultats de la recherche ont expiré."
else: else:
partition_type = "uuid" source = data["url"]
search_uuid = request.form["partition-uuid"]
data = db.execute(
"""
SELECT * FROM search_results
WHERE uuid = ?
""",
(search_uuid,)
).fetchone()
if data is None:
error = "Les résultats de la recherche ont expiré."
else:
source = data["url"]
else: else:
partition_type = "file" partition_type = "file"
@ -269,14 +262,8 @@ def add_partition(album_uuid):
flash(error) flash(error)
return redirect(request.referrer) return redirect(request.referrer)
if "author" in request.form: author = get_opt_string(request.form, "author")
author = request.form["author"] body = get_opt_string(request.form, "body")
else:
author = ""
if "body" in request.form:
body = request.form["body"]
else:
body = ""
while True: while True:
try: try:

View File

@ -78,6 +78,7 @@ def load_logged_in_user():
def create_user(username: str, password: str) -> Optional[str]: def create_user(username: str, password: str) -> Optional[str]:
"""Adds a new user to the database""" """Adds a new user to the database"""
error = None
if not username: if not username:
error = "Un nom d'utilisateur est requis." error = "Un nom d'utilisateur est requis."
elif not password: elif not password:
@ -95,9 +96,8 @@ def create_user(username: str, password: str) -> Optional[str]:
# The username was already taken, which caused the # The username was already taken, which caused the
# commit to fail. Show a validation error. # commit to fail. Show a validation error.
error = f"Le nom d'utilisateur {username} est déjà pris." error = f"Le nom d'utilisateur {username} est déjà pris."
if error is not None: return error # may be None
return error
@bp.route("/register", methods=("GET", "POST")) @bp.route("/register", methods=("GET", "POST"))

View File

@ -4,6 +4,7 @@ Classe Album
import os import os
from ..db import get_db from ..db import get_db
from ..utils import new_uuid
from .attachment import Attachment from .attachment import Attachment
@ -163,3 +164,26 @@ class Album():
(partition_uuid, self.id), (partition_uuid, self.id),
) )
db.commit() db.commit()
def create(name: str) -> str:
"""Créer un nouvel album"""
db = get_db()
while True:
try:
uuid = new_uuid()
db.execute(
"""
INSERT INTO album (uuid, name)
VALUES (?, ?)
""",
(uuid, name),
)
db.commit()
break
except db.IntegrityError:
pass
return uuid

View File

@ -7,7 +7,8 @@ from flask import (Blueprint, abort, flash, redirect, render_template,
from .auth import login_required from .auth import login_required
from .db import get_db from .db import get_db
from .utils import User, Album, Groupe, new_uuid, get_qrcode, format_uuid from .utils import User, Album, Groupe
from . import utils
bp = Blueprint("groupe", __name__, url_prefix="/groupe") bp = Blueprint("groupe", __name__, url_prefix="/groupe")
@ -26,8 +27,8 @@ def get_groupe(uuid):
groupe = Groupe(uuid=uuid) groupe = Groupe(uuid=uuid)
except LookupError: except LookupError:
try: try:
groupe = Groupe(uuid=format_uuid(uuid)) groupe = Groupe(uuid=utils.format_uuid(uuid))
return redirect(f"/groupe/{format_uuid(uuid)}") return redirect(f"/groupe/{utils.format_uuid(uuid)}")
except LookupError: except LookupError:
return abort(404) return abort(404)
@ -51,7 +52,7 @@ def get_groupe(uuid):
@bp.route("/<uuid>/qr") @bp.route("/<uuid>/qr")
def album_qr_code(uuid): def album_qr_code(uuid):
return get_qrcode(f"/groupe/{uuid}") return utils.get_qrcode(f"/groupe/{uuid}")
@ -68,7 +69,7 @@ def create_groupe():
if error is None: if error is None:
while True: while True:
try: try:
uuid = new_uuid() uuid = utils.new_uuid()
db.execute( db.execute(
""" """
@ -162,7 +163,7 @@ def delete_groupe(uuid):
@bp.route("/<groupe_uuid>/create-album", methods=["POST"]) @bp.route("/<groupe_uuid>/create-album", methods=["POST"])
@login_required @login_required
def create_album(groupe_uuid): def create_album_req(groupe_uuid):
try: try:
groupe = Groupe(uuid=groupe_uuid) groupe = Groupe(uuid=groupe_uuid)
except LookupError: except LookupError:
@ -181,32 +182,17 @@ def create_album(groupe_uuid):
error ="Vous n'êtes pas administrateur de ce groupe" error ="Vous n'êtes pas administrateur de ce groupe"
if error is None: if error is None:
while True: uuid = utils.create_album(name)
try: album = Album(uuid=uuid)
uuid = new_uuid()
db.execute( db.execute(
""" """
INSERT INTO album (uuid, name) INSERT INTO groupe_contient_album (groupe_id, album_id)
VALUES (?, ?) VALUES (?, ?)
""", """,
(uuid, name), (groupe.id, album.id)
) )
db.commit() db.commit()
album = Album(uuid=uuid)
db.execute(
"""
INSERT INTO groupe_contient_album (groupe_id, album_id)
VALUES (?, ?)
""",
(groupe.id, album.id)
)
db.commit()
break
except db.IntegrityError:
pass
if "response" in request.args and request.args["response"] == "json": if "response" in request.args and request.args["response"] == "json":
return { return {
@ -229,14 +215,14 @@ def get_album(groupe_uuid, album_uuid):
groupe = Groupe(uuid=groupe_uuid) groupe = Groupe(uuid=groupe_uuid)
except LookupError: except LookupError:
try: try:
groupe = Groupe(uuid=format_uuid(groupe_uuid)) groupe = Groupe(uuid=utils.format_uuid(groupe_uuid))
return redirect(f"/groupe/{format_uuid(groupe_uuid)}/{album_uuid}") return redirect(f"/groupe/{utils.format_uuid(groupe_uuid)}/{album_uuid}")
except LookupError: except LookupError:
return abort(404) return abort(404)
album_list = [a for a in groupe.get_albums() if a.uuid == album_uuid] album_list = [a for a in groupe.get_albums() if a.uuid == album_uuid]
if len(album_list) == 0: if len(album_list) == 0:
album_uuid = format_uuid(album_uuid) album_uuid = utils.format_uuid(album_uuid)
album_list = [a for a in groupe.get_albums() if a.uuid == album_uuid] album_list = [a for a in groupe.get_albums() if a.uuid == album_uuid]
if len(album_list) != 0: if len(album_list) != 0:
return redirect(f"/groupe/{groupe_uuid}/{album_uuid}") return redirect(f"/groupe/{groupe_uuid}/{album_uuid}")
@ -269,4 +255,4 @@ def get_album(groupe_uuid, album_uuid):
@bp.route("/<groupe_uuid>/<album_uuid>/qr") @bp.route("/<groupe_uuid>/<album_uuid>/qr")
def groupe_qr_code(groupe_uuid, album_uuid): def groupe_qr_code(groupe_uuid, album_uuid):
return get_qrcode(f"/groupe/{groupe_uuid}/{album_uuid}") return utils.get_qrcode(f"/groupe/{groupe_uuid}/{album_uuid}")

View File

@ -44,7 +44,6 @@ def attachments(uuid):
@bp.route("/<uuid>/add-attachment", methods=["POST"]) @bp.route("/<uuid>/add-attachment", methods=["POST"])
@login_required @login_required
def add_attachment(uuid): def add_attachment(uuid):
db = get_db()
try: try:
partition = Partition(uuid=uuid) partition = Partition(uuid=uuid)
except LookupError: except LookupError:
@ -66,7 +65,6 @@ def add_attachment(uuid):
if name == "": if name == "":
error = "Pas de nom de fichier" error = "Pas de nom de fichier"
else: else:
filename = request.files["file"].filename filename = request.files["file"].filename
ext = filename.split(".")[-1] ext = filename.split(".")[-1]
@ -81,6 +79,7 @@ def add_attachment(uuid):
try: try:
attachment_uuid = str(uuid4()) attachment_uuid = str(uuid4())
db = get_db()
db.execute( db.execute(
""" """
INSERT INTO attachments (uuid, name, filetype, partition_uuid, user_id) INSERT INTO attachments (uuid, name, filetype, partition_uuid, user_id)

View File

@ -111,7 +111,7 @@ def online_search(query, num_queries):
for thread in threads: for thread in threads:
thread.join() thread.join()
for element in partitions: for element in partitions.copy():
uuid = element["uuid"] uuid = element["uuid"]
url = element["url"] url = element["url"]
if os.stat(f"partitioncloud/search-partitions/{uuid}.pdf").st_size == 0: if os.stat(f"partitioncloud/search-partitions/{uuid}.pdf").st_size == 0:

View File

@ -29,6 +29,7 @@ from .classes.album import Album
from .classes.groupe import Groupe from .classes.groupe import Groupe
from .classes.partition import Partition from .classes.partition import Partition
from .classes.attachment import Attachment from .classes.attachment import Attachment
from .classes.album import create as create_album
def get_all_partitions(): def get_all_partitions():