From acaa8367d674600c369a67cb8380452c21f05bed Mon Sep 17 00:00:00 2001 From: augustin64 Date: Fri, 15 Dec 2023 13:38:32 +0100 Subject: [PATCH] Fix more pylint warnings --- .pylintrc | 5 + partitioncloud/__init__.py | 36 ++--- partitioncloud/modules/admin.py | 8 +- partitioncloud/modules/albums.py | 2 +- partitioncloud/modules/auth.py | 54 +++---- partitioncloud/modules/classes/partition.py | 2 +- partitioncloud/modules/groupe.py | 6 +- partitioncloud/modules/partition.py | 4 +- partitioncloud/modules/search.py | 2 +- partitioncloud/modules/utils.py | 2 +- scripts/hooks/utils.py | 8 +- scripts/hooks/v1.py | 58 ++++---- scripts/migration.py | 149 ++++++++++++-------- 13 files changed, 189 insertions(+), 147 deletions(-) create mode 100644 .pylintrc diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..7868fe5 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,5 @@ +[BASIC] +good-names=db, v, v1, v2, f, id, i, j, k + +[MESSAGES CONTROL] +disable=pointless-string-statement \ No newline at end of file diff --git a/partitioncloud/__init__.py b/partitioncloud/__init__.py index 0f6a8c6..ee05b3b 100644 --- a/partitioncloud/__init__.py +++ b/partitioncloud/__init__.py @@ -57,36 +57,20 @@ def add_user(): username = request.form["username"] password = request.form["password"] album_uuid = request.form["album_uuid"] - db = get_db() - error = None - if not username: - error = "Un nom d'utilisateur est requis." - elif not password: - error = "Un mot de passe est requis." + error = auth.create_user(username, password) if error is None: + # Success, go to the login page. + user = User(name=username) try: - db.execute( - "INSERT INTO user (username, password) VALUES (?, ?)", - (username, generate_password_hash(password)), - ) - db.commit() - except db.IntegrityError: - # The username was already taken, which caused the - # commit to fail. Show a validation error. - error = f"Le nom d'utilisateur {username} est déjà pris." - else: - # Success, go to the login page. - user = User(name=username) - try: - if album_uuid != "": - user.join_album(album_uuid) - flash(f"Utilisateur {username} créé") - return redirect("/albums") - except LookupError: - flash(f"Cet album n'existe pas. L'utilisateur {username} a été créé") - return redirect("/albums") + if album_uuid != "": + user.join_album(album_uuid) + flash(f"Utilisateur {username} créé") + return redirect("/albums") + except LookupError: + flash(f"Cet album n'existe pas. L'utilisateur {username} a été créé") + return redirect("/albums") flash(error) return render_template("auth/register.html", albums=get_all_albums(), user=current_user) diff --git a/partitioncloud/modules/admin.py b/partitioncloud/modules/admin.py index 00bd856..05ccae9 100644 --- a/partitioncloud/modules/admin.py +++ b/partitioncloud/modules/admin.py @@ -25,10 +25,10 @@ def index(): SELECT id FROM user """ ) - users = [User(user_id=u["id"]) for u in users_id] - for u in users: - u.albums = u.get_albums() - u.partitions = u.get_partitions() + users = [User(user_id=user["id"]) for user in users_id] + for user in users: + user.get_albums() + user.get_partitions() return render_template( "admin/index.html", diff --git a/partitioncloud/modules/albums.py b/partitioncloud/modules/albums.py index 53705a2..d8d7407 100644 --- a/partitioncloud/modules/albums.py +++ b/partitioncloud/modules/albums.py @@ -65,7 +65,7 @@ def search_page(): ) @bp.route("/") -def album(uuid): +def get_album(uuid): """ Album page """ diff --git a/partitioncloud/modules/auth.py b/partitioncloud/modules/auth.py index ccc37cc..1cb392c 100644 --- a/partitioncloud/modules/auth.py +++ b/partitioncloud/modules/auth.py @@ -3,6 +3,7 @@ Authentification module """ import functools +from typing import Optional from flask import (Blueprint, flash, g, redirect, render_template, request, session, url_for, current_app) @@ -75,6 +76,30 @@ def load_logged_in_user(): ) +def create_user(username: str, password: str) -> Optional[str]: + """Adds a new user to the database""" + if not username: + error = "Un nom d'utilisateur est requis." + elif not password: + error = "Un mot de passe est requis." + + try: + db = get_db() + + db.execute( + "INSERT INTO user (username, password) VALUES (?, ?)", + (username, generate_password_hash(password)), + ) + db.commit() + except db.IntegrityError: + # The username was already taken, which caused the + # commit to fail. Show a validation error. + error = f"Le nom d'utilisateur {username} est déjà pris." + + if error is not None: + return error + + @bp.route("/register", methods=("GET", "POST")) @anon_required def register(): @@ -89,32 +114,13 @@ def register(): if request.method == "POST": username = request.form["username"] password = request.form["password"] - db = get_db() - error = None - if not username: - error = "Un nom d'utilisateur est requis." - elif not password: - error = "Un mot de passe est requis." + error = create_user(username, password) - if error is None: - try: - db.execute( - "INSERT INTO user (username, password) VALUES (?, ?)", - (username, generate_password_hash(password)), - ) - db.commit() - flash(f"Utilisateur {username} créé avec succès. Vous pouvez vous connecter.") - except db.IntegrityError: - # The username was already taken, which caused the - # commit to fail. Show a validation error. - error = f"Le nom d'utilisateur {username} est déjà pris. \ - Vous souhaitez peut-être vous connecter" - else: - # Success, go to the login page. - return redirect(url_for("auth.login")) - - flash(error) + if error is not None: + flash(error) + else: + flash("Utilisateur créé avec succès. Vous pouvez vous connecter.") return render_template("auth/register.html") diff --git a/partitioncloud/modules/classes/partition.py b/partitioncloud/modules/classes/partition.py index 7caa75c..9815f91 100644 --- a/partitioncloud/modules/classes/partition.py +++ b/partitioncloud/modules/classes/partition.py @@ -60,7 +60,7 @@ class Partition(): def update(self, name=None, author="", body=""): if name is None: - raise Exception("name cannot be None") + raise ValueError("name cannot be None") db = get_db() db.execute( diff --git a/partitioncloud/modules/groupe.py b/partitioncloud/modules/groupe.py index f9bf315..decb8d9 100644 --- a/partitioncloud/modules/groupe.py +++ b/partitioncloud/modules/groupe.py @@ -18,7 +18,7 @@ def index(): @bp.route("/") -def groupe(uuid): +def get_groupe(uuid): """ Groupe page """ @@ -221,7 +221,7 @@ def create_album(groupe_uuid): @bp.route("//") -def album(groupe_uuid, album_uuid): +def get_album(groupe_uuid, album_uuid): """ Album page """ @@ -246,7 +246,7 @@ def album(groupe_uuid, album_uuid): user = User(user_id=session.get("user_id")) # List of users without duplicate - users_id = list(set([i["id"] for i in album.get_users()+groupe.get_users()])) + users_id = list({i["id"] for i in album.get_users()+groupe.get_users()}) album.users = [User(user_id=id) for id in users_id] partitions = album.get_partitions() diff --git a/partitioncloud/modules/partition.py b/partitioncloud/modules/partition.py index 1d5dbe7..e55ad7e 100644 --- a/partitioncloud/modules/partition.py +++ b/partitioncloud/modules/partition.py @@ -14,7 +14,7 @@ from .utils import get_all_partitions, User, Partition, Attachment bp = Blueprint("partition", __name__, url_prefix="/partition") @bp.route("/") -def partition(uuid): +def get_partition(uuid): try: partition = Partition(uuid=uuid) except LookupError: @@ -107,7 +107,7 @@ def add_attachment(uuid): @bp.route("/attachment/.") -def attachment(uuid, filetype): +def get_attachment(uuid, filetype): try: attachment = Attachment(uuid=uuid) except LookupError: diff --git a/partitioncloud/modules/search.py b/partitioncloud/modules/search.py index 6126572..c2f6b3f 100644 --- a/partitioncloud/modules/search.py +++ b/partitioncloud/modules/search.py @@ -57,7 +57,7 @@ def download_search_result(element): urllib.request.urlretrieve(url, f"partitioncloud/search-partitions/{uuid}.pdf") except (urllib.error.HTTPError, urllib.error.URLError): - with open(f"partitioncloud/search-partitions/{uuid}.pdf", 'a', encoding="utf8") as f: + with open(f"partitioncloud/search-partitions/{uuid}.pdf", 'a', encoding="utf8") as _: pass # Create empty file diff --git a/partitioncloud/modules/utils.py b/partitioncloud/modules/utils.py index e370df6..ff225af 100644 --- a/partitioncloud/modules/utils.py +++ b/partitioncloud/modules/utils.py @@ -4,8 +4,8 @@ import random import string import qrcode -from .db import get_db from flask import current_app, send_file +from .db import get_db def new_uuid(): diff --git a/scripts/hooks/utils.py b/scripts/hooks/utils.py index 93dce86..d095752 100644 --- a/scripts/hooks/utils.py +++ b/scripts/hooks/utils.py @@ -2,6 +2,7 @@ import random import string import sqlite3 + def run_sqlite_command(*args): """Run a command against the database""" con = sqlite3.connect("instance/partitioncloud.sqlite") @@ -10,6 +11,7 @@ def run_sqlite_command(*args): con.commit() con.close() + def get_sqlite_data(*args): """Get data from the db""" con = sqlite3.connect("instance/partitioncloud.sqlite") @@ -19,8 +21,12 @@ def get_sqlite_data(*args): con.close() return new_data + def new_uuid(): - return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(6)]) + return "".join( + [random.choice(string.ascii_uppercase + string.digits) for _ in range(6)] + ) + def format_uuid(uuid): """Format old uuid4 format""" diff --git a/scripts/hooks/v1.py b/scripts/hooks/v1.py index b0dfcc7..1a5613b 100644 --- a/scripts/hooks/v1.py +++ b/scripts/hooks/v1.py @@ -6,10 +6,11 @@ from colorama import Fore, Style """ v1.3.* """ + + def add_source(): - utils.run_sqlite_command( - "ALTER TABLE partition ADD source TEXT DEFAULT 'unknown'" - ) + utils.run_sqlite_command("ALTER TABLE partition ADD source TEXT DEFAULT 'unknown'") + def add_groupes(): utils.run_sqlite_command( @@ -35,6 +36,7 @@ def add_groupes(): );""" ) + def add_attachments(): os.makedirs("partitioncloud/attachments", exist_ok=True) utils.run_sqlite_command( @@ -47,6 +49,7 @@ def add_attachments(): );""" ) + def install_colorama(): os.system("pip install colorama -qq") @@ -54,19 +57,17 @@ def install_colorama(): """ v1.4.* """ + + def mass_rename(): """Rename all albums & groupes to use a shorter uuid""" albums = utils.get_sqlite_data("SELECT * FROM album") groupes = utils.get_sqlite_data("SELECT * FROM groupe") - utils.run_sqlite_command( - "ALTER TABLE groupe RENAME TO _groupe_old" - ) - utils.run_sqlite_command( - "ALTER TABLE album RENAME TO _album_old" - ) + utils.run_sqlite_command("ALTER TABLE groupe RENAME TO _groupe_old") + utils.run_sqlite_command("ALTER TABLE album RENAME TO _album_old") - utils.run_sqlite_command( # Add UNIQUE constraint & change uuid length + utils.run_sqlite_command( # Add UNIQUE constraint & change uuid length """CREATE TABLE groupe ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, @@ -74,7 +75,7 @@ def mass_rename(): );""" ) - utils.run_sqlite_command( # Change uuid length + utils.run_sqlite_command( # Change uuid length """CREATE TABLE album ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, @@ -89,17 +90,20 @@ def mass_rename(): INSERT INTO album (id, name, uuid) VALUES (?, ?, ?) """, - (album[0], album[1], utils.format_uuid(album[2])) + (album[0], album[1], utils.format_uuid(album[2])), ) except sqlite3.IntegrityError: uuid = utils.new_uuid() - print(f"{Fore.RED}Collision on {album[1]}{Style.RESET_ALL} ({album[2][:10]} renaming to {uuid})") + print( + f"{Fore.RED}Collision on {album[1]}{Style.RESET_ALL} \ + ({album[2][:10]} renaming to {uuid})" + ) utils.run_sqlite_command( """ INSERT INTO album (id, name, uuid) VALUES (?, ?, ?) """, - (album[0], album[1], uuid) + (album[0], album[1], uuid), ) for groupe in groupes: @@ -109,28 +113,32 @@ def mass_rename(): INSERT INTO groupe (id, name, uuid) VALUES (?, ?, ?) """, - (groupe[0], groupe[1], utils.format_uuid(groupe[2])) + (groupe[0], groupe[1], utils.format_uuid(groupe[2])), ) except sqlite3.IntegrityError: uuid = utils.new_uuid() - print(f"{Fore.RED}Collision on {groupe[1]}{Style.RESET_ALL} ({groupe[2][:10]} renaming to {uuid})") + print( + f"{Fore.RED}Collision on {groupe[1]}{Style.RESET_ALL} \ + ({groupe[2][:10]} renaming to {uuid})" + ) utils.run_sqlite_command( """ INSERT INTO groupe (id, name, uuid) VALUES (?, ?, ?) """, - (groupe[0], groupe[1], uuid) + (groupe[0], groupe[1], uuid), ) - utils.run_sqlite_command( - "DROP TABLE _groupe_old" - ) - utils.run_sqlite_command( - "DROP TABLE _album_old" - ) + utils.run_sqlite_command("DROP TABLE _groupe_old") + utils.run_sqlite_command("DROP TABLE _album_old") + def base_url_parameter_added(): - print(f"{Style.BRIGHT}{Fore.YELLOW}The parameter BASE_URL has been added, reference your front url in it{Style.RESET_ALL}") + print( + f"{Style.BRIGHT}{Fore.YELLOW}The parameter BASE_URL has been added, \ + reference your front url in it{Style.RESET_ALL}" + ) + def install_qrcode(): - os.system("pip install qrcode -qq") \ No newline at end of file + os.system("pip install qrcode -qq") diff --git a/scripts/migration.py b/scripts/migration.py index 00540b3..096aa73 100644 --- a/scripts/migration.py +++ b/scripts/migration.py @@ -1,18 +1,18 @@ #!/usr/bin/python3 import os +import sys import shutil import argparse from functools import cmp_to_key -from distutils.dir_util import copy_tree from colorama import Fore, Style -from hooks import v1 +from hooks import v1 as v1_hooks def get_version(v: str) -> (int, int, int): """Returns a tuple (major, minor, patch from the string v{major}.{minor}.{patch})""" - assert v[0] == 'v' # Check if the version is correctly formatted - return tuple(map(int, v[1:].split('.'))) + assert v[0] == "v" # Check if the version is correctly formatted + return tuple(map(int, v[1:].split("."))) def is_newer(v1: str, v2: str) -> bool: @@ -21,30 +21,24 @@ def is_newer(v1: str, v2: str) -> bool: hooks = [ - ("v1.3.0", [ - ("add SOURCE column", v1.add_source) - ]), - ("v1.2.0", [ - ("create groupe structure", v1.add_groupes) - ]), - ("v1.3.0", [ - ("create attachment table", v1.add_attachments) - ]), - ("v1.3.3", [ - ("Install colorama", v1.install_colorama) - ]), - ("v1.4.0", [ - ("Change all albums & groupes uuids", v1.mass_rename), - ("Warn new parameter", v1.base_url_parameter_added) - ]), - ("v1.4.1", [ - ("Install qrcode", v1.install_qrcode) - ]), + ("v1.3.0", [("add SOURCE column", v1_hooks.add_source)]), + ("v1.2.0", [("create groupe structure", v1_hooks.add_groupes)]), + ("v1.3.0", [("create attachment table", v1_hooks.add_attachments)]), + ("v1.3.3", [("Install colorama", v1_hooks.install_colorama)]), + ( + "v1.4.0", + [ + ("Change all albums & groupes uuids", v1_hooks.mass_rename), + ("Warn new parameter", v1_hooks.base_url_parameter_added), + ], + ), + ("v1.4.1", [("Install qrcode", v1_hooks.install_qrcode)]), ] def get_hooks(current, target): """Returns a list of hooks needed to migrate""" + def compare(v1: str, v2: str): if is_newer(v2[0], v1[0]): return -1 @@ -54,7 +48,9 @@ def get_hooks(current, target): applied_hooks = [] for hook in hooks: - if is_newer(hook[0], current) and (target == hook[0] or is_newer(target, hook[0])): + if is_newer(hook[0], current) and ( + target == hook[0] or is_newer(target, hook[0]) + ): applied_hooks.append(hook) return sorted(applied_hooks, key=cmp_to_key(compare)) @@ -62,43 +58,53 @@ def get_hooks(current, target): def backup_instance(version, verbose=True): """Backs up current instance in backups/{version}""" - def print_verbose(*args): + + def print_verbose(*f_args): if verbose: - print(*args) + print(*f_args) print_verbose("\nBacking up current instance") dest = os.path.join("backups", version) if os.path.exists(dest): print(f"{Fore.RED}Backup directory already exists{Style.RESET_ALL}") - exit(1) + sys.exit(1) os.makedirs(dest) paths = [ ("instance", os.path.join(dest, "instance")), - (os.path.join("partitioncloud", "partitions"), os.path.join(dest, "partitions")), - (os.path.join("partitioncloud", "attachments"), os.path.join(dest, "attachments")), - (os.path.join("partitioncloud", "search-partitions"), os.path.join(dest, "search-partitions")) + ( + os.path.join("partitioncloud", "partitions"), + os.path.join(dest, "partitions"), + ), + ( + os.path.join("partitioncloud", "attachments"), + os.path.join(dest, "attachments"), + ), + ( + os.path.join("partitioncloud", "search-partitions"), + os.path.join(dest, "search-partitions"), + ), ] for src, dst in paths: if os.path.exists(src): print_verbose(f"\tBacking up {src}") - copy_tree(src, dst) + shutil.copy_tree(src, dst) -def print_hooks(hooks): - for hook in hooks: +def print_hooks(hooks_list): + for hook in hooks_list: print(f"- {Fore.BLUE}{hook[0]}{Style.RESET_ALL}:") - for subhook in hook[1]: - print("\t", subhook[0]) + for sub_hook in hook[1]: + print("\t", sub_hook[0]) -def apply_hooks(hooks): - for hook in hooks: +def apply_hooks(hooks_list): + for hook in hooks_list: print(f"Migrating to {Fore.BLUE}{hook[0]}{Style.RESET_ALL}:") - for subhook in hook[1]: - print(f"\tApplying '{subhook[0]}'") - subhook[1]() + for sub_hook in hook[1]: + print(f"\tApplying '{sub_hook[0]}'") + sub_hook[1]() def migrate(current, target, skip_backup=False, prog_name="scripts/migration.py"): @@ -110,19 +116,24 @@ def migrate(current, target, skip_backup=False, prog_name="scripts/migration.py" applied_hooks = get_hooks(current, target) if len(applied_hooks) == 0: print(f"{Fore.GREEN}No hook to apply{Style.RESET_ALL}") - exit(0) + sys.exit(0) print("The following hooks will be applied:") print_hooks(applied_hooks) if input("Apply these hooks ? [y/N] ") != "y": print(f"{Fore.RED}Aborting !{Style.RESET_ALL}") - exit(1) + sys.exit(1) if not skip_backup: backup_instance(current) - print(f"Instance backed up in {Style.BRIGHT}backups/{current}{Style.RESET_ALL}\n") - print(f"If something goes wrong, recover with {Style.BRIGHT}{Fore.BLUE}{prog_name} --restore {current}{Style.RESET_ALL}") + print( + f"Instance backed up in {Style.BRIGHT}backups/{current}{Style.RESET_ALL}\n" + ) + print( + f"If something goes wrong, recover with {Style.BRIGHT}{Fore.BLUE}{prog_name}\ + --restore {current}{Style.RESET_ALL}" + ) else: print("Skipping automatic backup") @@ -131,17 +142,31 @@ def migrate(current, target, skip_backup=False, prog_name="scripts/migration.py" def restore(version): - if input("Do you really want to restore from backup ? Your current data will be deleted [y/N] ") != "y": + if ( + input( + "Do you really want to restore from backup ? Your current data will be deleted [y/N] " + ) + != "y" + ): print(f"{Fore.RED}Aborting !{Style.RESET_ALL}") - exit(1) + sys.exit(1) dest = os.path.join("backups", version) print(f"Restoring from {dest}") paths = [ ("instance", os.path.join(dest, "instance")), - (os.path.join("partitioncloud", "partitions"), os.path.join(dest, "partitions")), - (os.path.join("partitioncloud", "attachments"), os.path.join(dest, "attachments")), - (os.path.join("partitioncloud", "search-partitions"), os.path.join(dest, "search-partitions")) + ( + os.path.join("partitioncloud", "partitions"), + os.path.join(dest, "partitions"), + ), + ( + os.path.join("partitioncloud", "attachments"), + os.path.join(dest, "attachments"), + ), + ( + os.path.join("partitioncloud", "search-partitions"), + os.path.join(dest, "search-partitions"), + ), ] for src, dst in paths: if os.path.exists(src): @@ -149,24 +174,32 @@ def restore(version): if os.path.exists(dst): print(f"\tRestoring {src}") - copy_tree(dst, src) + shutil.copy_tree(dst, src) else: - print(f"\t{Fore.RED}No available backup for {src}, deleting current content to avoid any conflict{Style.RESET_ALL}") + print( + f"\t{Fore.RED}No available backup for {src}, \ + deleting current content to avoid any conflict{Style.RESET_ALL}" + ) if __name__ == "__main__": parser = argparse.ArgumentParser( - prog='PartitionCloud Migration tool', - description='Helps you migrate from one version to another') + prog="PartitionCloud Migration tool", + description="Helps you migrate from one version to another", + ) - parser.add_argument('-c', '--current', help="current version (vx.y.z)") - parser.add_argument('-t', '--target', help="target version (vx.y.z)") - parser.add_argument('-s', '--skip-backup', action='store_true') - parser.add_argument('-r', '--restore', help='restore from specific version backup, will not apply any hook (vx.y.z)') + parser.add_argument("-c", "--current", help="current version (vx.y.z)") + parser.add_argument("-t", "--target", help="target version (vx.y.z)") + parser.add_argument("-s", "--skip-backup", action="store_true") + parser.add_argument( + "-r", + "--restore", + help="restore from specific version backup, will not apply any hook (vx.y.z)", + ) args = parser.parse_args() if args.restore is None: migrate(args.current, args.target, skip_backup=args.skip_backup) else: - restore(args.restore) \ No newline at end of file + restore(args.restore)