Compare commits

...

3 Commits

Author SHA1 Message Date
d54419fd35 Add MAX_AGE config parameter 2024-01-13 12:44:28 +01:00
acaa8367d6 Fix more pylint warnings 2023-12-15 13:38:32 +01:00
11ddbf0169 Take advice from pylint 2023-12-15 11:36:34 +01:00
18 changed files with 314 additions and 281 deletions

5
.pylintrc Normal file
View File

@ -0,0 +1,5 @@
[BASIC]
good-names=db, v, v1, v2, f, id, i, j, k
[MESSAGES CONTROL]
disable=pointless-string-statement

View File

@ -15,4 +15,7 @@ MAX_ONLINE_QUERIES=3
DISABLE_REGISTER=False DISABLE_REGISTER=False
# Front URL of the application (for QRCodes generation) # Front URL of the application (for QRCodes generation)
BASE_URL="http://localhost:5000" BASE_URL="http://localhost:5000"
# Session expiration, in days
MAX_AGE=31

View File

@ -3,6 +3,7 @@
Main file Main file
""" """
import os import os
import datetime
import subprocess import subprocess
from flask import Flask, g, redirect, render_template, request, send_file, flash, session, abort from flask import Flask, g, redirect, render_template, request, send_file, flash, session, abort
@ -32,9 +33,10 @@ app.register_blueprint(partition.bp)
try: try:
result = subprocess.run(["git", "describe", "--tags"], stdout=subprocess.PIPE) result = subprocess.run(["git", "describe", "--tags"], stdout=subprocess.PIPE, check=True)
__version__ = result.stdout.decode('utf8') __version__ = result.stdout.decode('utf8')
except FileNotFoundError: # In case git not found, which would be strange except (FileNotFoundError, subprocess.CalledProcessError):
# In case git not found or any platform specific weird error
__version__ = "unknown" __version__ = "unknown"
@ -56,36 +58,20 @@ def add_user():
username = request.form["username"] username = request.form["username"]
password = request.form["password"] password = request.form["password"]
album_uuid = request.form["album_uuid"] album_uuid = request.form["album_uuid"]
db = get_db()
error = None
if not username: error = auth.create_user(username, password)
error = "Un nom d'utilisateur est requis."
elif not password:
error = "Un mot de passe est requis."
if error is None: if error is None:
# Success, go to the login page.
user = User(name=username)
try: try:
db.execute( if album_uuid != "":
"INSERT INTO user (username, password) VALUES (?, ?)", user.join_album(album_uuid)
(username, generate_password_hash(password)), flash(f"Utilisateur {username} créé")
) return redirect("/albums")
db.commit() except LookupError:
except db.IntegrityError: flash(f"Cet album n'existe pas. L'utilisateur {username} a été créé")
# The username was already taken, which caused the return redirect("/albums")
# commit to fail. Show a validation error.
error = f"Le nom d'utilisateur {username} est déjà pris."
else:
# Success, go to the login page.
user = User(name=username)
try:
if album_uuid != "":
user.join_album(album_uuid)
flash(f"Utilisateur {username} créé")
return redirect("/albums")
except LookupError:
flash(f"Cet album n'existe pas. L'utilisateur {username} a été créé")
return redirect("/albums")
flash(error) flash(error)
return render_template("auth/register.html", albums=get_all_albums(), user=current_user) return render_template("auth/register.html", albums=get_all_albums(), user=current_user)
@ -94,8 +80,11 @@ def add_user():
@app.route("/static/search-thumbnails/<uuid>.jpg") @app.route("/static/search-thumbnails/<uuid>.jpg")
@login_required @login_required
def search_thumbnail(uuid): def search_thumbnail(uuid):
"""
Renvoie l'apercu d'un résultat de recherche
"""
db = get_db() db = get_db()
partition = db.execute( part = db.execute(
""" """
SELECT uuid, url FROM search_results SELECT uuid, url FROM search_results
WHERE uuid = ? WHERE uuid = ?
@ -103,7 +92,7 @@ def search_thumbnail(uuid):
(uuid,) (uuid,)
).fetchone() ).fetchone()
if partition is None: if part is None:
abort(404) abort(404)
if not os.path.exists(os.path.join(app.static_folder, "search-thumbnails", f"{uuid}.jpg")): if not os.path.exists(os.path.join(app.static_folder, "search-thumbnails", f"{uuid}.jpg")):
os.system( os.system(
@ -117,11 +106,18 @@ def search_thumbnail(uuid):
return send_file(os.path.join(app.static_folder, "search-thumbnails", f"{uuid}.jpg")) return send_file(os.path.join(app.static_folder, "search-thumbnails", f"{uuid}.jpg"))
@app.before_request
def before_request():
"""Set cookie max age to 31 days"""
session.permanent = True
app.permanent_session_lifetime = datetime.timedelta(days=int(app.config["MAX_AGE"]))
@app.context_processor @app.context_processor
def inject_default_variables(): def inject_default_variables():
"""Inject the version number in the template variables"""
if __version__ == "unknown": if __version__ == "unknown":
return dict(version="") return {"version": ''}
return dict(version=__version__) return {"version": __version__}
@app.after_request @app.after_request

View File

@ -2,8 +2,7 @@
""" """
Admin Panel Admin Panel
""" """
import os from flask import Blueprint, render_template, session
from flask import Blueprint, abort, send_file, render_template, session
from .db import get_db from .db import get_db
from .auth import admin_required from .auth import admin_required
@ -15,6 +14,9 @@ bp = Blueprint("admin", __name__, url_prefix="/admin")
@bp.route("/") @bp.route("/")
@admin_required @admin_required
def index(): def index():
"""
Admin panel home page
"""
current_user = User(user_id=session.get("user_id")) current_user = User(user_id=session.get("user_id"))
current_user.get_albums() # We need to do that before we close the db current_user.get_albums() # We need to do that before we close the db
db = get_db() db = get_db()
@ -23,13 +25,13 @@ def index():
SELECT id FROM user SELECT id FROM user
""" """
) )
users = [User(user_id=u["id"]) for u in users_id] users = [User(user_id=user["id"]) for user in users_id]
for u in users: for user in users:
u.albums = u.get_albums() user.get_albums()
u.partitions = u.get_partitions() user.get_partitions()
return render_template( return render_template(
"admin/index.html", "admin/index.html",
users=users, users=users,
user=current_user user=current_user
) )

View File

@ -6,8 +6,8 @@ import os
import shutil import shutil
from uuid import uuid4 from uuid import uuid4
from flask import (Blueprint, abort, flash, redirect, render_template, request, from flask import (Blueprint, abort, flash, redirect, render_template,
send_file, session, current_app) request, session, current_app)
from .auth import login_required from .auth import login_required
from .db import get_db from .db import get_db
@ -20,13 +20,10 @@ bp = Blueprint("albums", __name__, url_prefix="/albums")
@bp.route("/") @bp.route("/")
@login_required @login_required
def index(): def index():
"""
Albums home page
"""
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
albums = user.get_albums()
if user.access_level == 1:
max_queries = 10
else:
max_queries = current_app.config["MAX_ONLINE_QUERIES"]
return render_template("albums/index.html", user=user) return render_template("albums/index.html", user=user)
@ -34,6 +31,9 @@ def index():
@bp.route("/search", methods=["POST"]) @bp.route("/search", methods=["POST"])
@login_required @login_required
def search_page(): def search_page():
"""
Résultats de recherche
"""
if "query" not in request.form or request.form["query"] == "": if "query" not in request.form or request.form["query"] == "":
flash("Aucun terme de recherche spécifié.") flash("Aucun terme de recherche spécifié.")
return redirect("/albums") return redirect("/albums")
@ -65,7 +65,7 @@ def search_page():
) )
@bp.route("/<uuid>") @bp.route("/<uuid>")
def album(uuid): def get_album(uuid):
""" """
Album page Album page
""" """
@ -98,14 +98,18 @@ def album(uuid):
@bp.route("/<uuid>/qr") @bp.route("/<uuid>/qr")
def qr_code(uuid): def qr_code(uuid):
"""
Renvoie le QR Code d'un album
"""
return get_qrcode(f"/albums/{uuid}") return get_qrcode(f"/albums/{uuid}")
@bp.route("/create-album", methods=["POST"]) @bp.route("/create-album", methods=["POST"])
@login_required @login_required
def create_album(): def create_album():
current_user = User(user_id=session.get("user_id")) """
Création d'un album
"""
name = request.form["name"] name = request.form["name"]
db = get_db() db = get_db()
error = None error = None
@ -145,8 +149,7 @@ def create_album():
"status": "ok", "status": "ok",
"uuid": uuid "uuid": uuid
} }
else: return redirect(f"/albums/{uuid}")
return redirect(f"/albums/{uuid}")
flash(error) flash(error)
return redirect(request.referrer) return redirect(request.referrer)
@ -155,6 +158,9 @@ def create_album():
@bp.route("/<uuid>/join") @bp.route("/<uuid>/join")
@login_required @login_required
def join_album(uuid): def join_album(uuid):
"""
Rejoindre un album
"""
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
try: try:
user.join_album(uuid) user.join_album(uuid)
@ -169,6 +175,9 @@ def join_album(uuid):
@bp.route("/<uuid>/quit") @bp.route("/<uuid>/quit")
@login_required @login_required
def quit_album(uuid): def quit_album(uuid):
"""
Quitter un album
"""
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
album = Album(uuid=uuid) album = Album(uuid=uuid)
users = album.get_users() users = album.get_users()
@ -182,26 +191,28 @@ def quit_album(uuid):
user.quit_album(uuid) user.quit_album(uuid)
flash("Album quitté.") flash("Album quitté.")
return redirect(f"/albums") return redirect("/albums")
@bp.route("/<uuid>/delete", methods=["GET", "POST"]) @bp.route("/<uuid>/delete", methods=["GET", "POST"])
@login_required @login_required
def delete_album(uuid): def delete_album(uuid):
db = get_db() """
Supprimer un album
"""
album = Album(uuid=uuid) album = Album(uuid=uuid)
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
if request.method == "GET": if request.method == "GET":
return render_template("albums/delete-album.html", album=album, user=user) return render_template("albums/delete-album.html", album=album, user=user)
error = None error = None
users = album.get_users() users = album.get_users()
if len(users) > 1: if len(users) > 1:
error = "Vous n'êtes pas seul dans cet album." error = "Vous n'êtes pas seul dans cet album."
elif len(users) == 1 and users[0]["id"] != user.id: elif len(users) == 1 and users[0]["id"] != user.id:
error = "Vous ne possédez pas cet album." error = "Vous ne possédez pas cet album."
if user.access_level == 1: if user.access_level == 1:
error = None error = None
@ -218,6 +229,9 @@ def delete_album(uuid):
@bp.route("/<album_uuid>/add-partition", methods=["POST"]) @bp.route("/<album_uuid>/add-partition", methods=["POST"])
@login_required @login_required
def add_partition(album_uuid): def add_partition(album_uuid):
"""
Ajouter une partition à un album (par upload)
"""
db = get_db() db = get_db()
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
album = Album(uuid=album_uuid) album = Album(uuid=album_uuid)
@ -281,7 +295,10 @@ def add_partition(album_uuid):
file = request.files["file"] file = request.files["file"]
file.save(f"partitioncloud/partitions/{partition_uuid}.pdf") file.save(f"partitioncloud/partitions/{partition_uuid}.pdf")
else: else:
shutil.copyfile(f"partitioncloud/search-partitions/{search_uuid}.pdf", f"partitioncloud/partitions/{partition_uuid}.pdf") shutil.copyfile(
f"partitioncloud/search-partitions/{search_uuid}.pdf",
f"partitioncloud/partitions/{partition_uuid}.pdf"
)
os.system( os.system(
f'/usr/bin/convert -thumbnail\ f'/usr/bin/convert -thumbnail\
@ -290,14 +307,6 @@ def add_partition(album_uuid):
partitioncloud/partitions/{partition_uuid}.pdf[0] \ partitioncloud/partitions/{partition_uuid}.pdf[0] \
partitioncloud/static/thumbnails/{partition_uuid}.jpg' partitioncloud/static/thumbnails/{partition_uuid}.jpg'
) )
album_id = db.execute(
"""
SELECT id FROM album
WHERE uuid = ?
""",
(album.uuid,)
).fetchone()["id"]
db.commit() db.commit()
album.add_partition(partition_uuid) album.add_partition(partition_uuid)
@ -311,14 +320,16 @@ def add_partition(album_uuid):
"status": "ok", "status": "ok",
"uuid": partition_uuid "uuid": partition_uuid
} }
else: flash(f"Partition {request.form['name']} ajoutée")
flash(f"Partition {request.form['name']} ajoutée") return redirect(f"/albums/{album.uuid}")
return redirect(f"/albums/{album.uuid}")
@bp.route("/add-partition", methods=["POST"]) @bp.route("/add-partition", methods=["POST"])
@login_required @login_required
def add_partition_from_search(): def add_partition_from_search():
"""
Ajout d'une partition (depuis la recherche)
"""
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
error = None error = None
@ -330,7 +341,7 @@ def add_partition_from_search():
error = "Il est nécessaire de spécifier un type de partition." error = "Il est nécessaire de spécifier un type de partition."
elif (not user.is_participant(request.form["album-uuid"])) and (user.access_level != 1): elif (not user.is_participant(request.form["album-uuid"])) and (user.access_level != 1):
error = "Vous ne participez pas à cet album." error = "Vous ne participez pas à cet album."
if error is not None: if error is not None:
flash(error) flash(error)
return redirect("/albums") return redirect("/albums")
@ -355,7 +366,7 @@ def add_partition_from_search():
return redirect(f"/albums/{album.uuid}") return redirect(f"/albums/{album.uuid}")
elif request.form["partition-type"] == "online_search": if request.form["partition-type"] == "online_search":
return render_template( return render_template(
"albums/add-partition.html", "albums/add-partition.html",
album=album, album=album,
@ -363,6 +374,5 @@ def add_partition_from_search():
user=user user=user
) )
else: flash("Type de partition inconnu.")
flash("Type de partition inconnu.") return redirect("/albums")
return redirect("/albums")

View File

@ -3,19 +3,11 @@
Authentification module Authentification module
""" """
import functools import functools
from typing import Optional
from flask import (Blueprint, flash, g, redirect, render_template,
request, session, url_for, current_app)
from flask import (
Blueprint,
flash,
g,
redirect,
render_template,
request,
session,
url_for,
flash,
current_app
)
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
from .db import get_db from .db import get_db
@ -84,6 +76,30 @@ def load_logged_in_user():
) )
def create_user(username: str, password: str) -> Optional[str]:
"""Adds a new user to the database"""
if not username:
error = "Un nom d'utilisateur est requis."
elif not password:
error = "Un mot de passe est requis."
try:
db = get_db()
db.execute(
"INSERT INTO user (username, password) VALUES (?, ?)",
(username, generate_password_hash(password)),
)
db.commit()
except db.IntegrityError:
# The username was already taken, which caused the
# commit to fail. Show a validation error.
error = f"Le nom d'utilisateur {username} est déjà pris."
if error is not None:
return error
@bp.route("/register", methods=("GET", "POST")) @bp.route("/register", methods=("GET", "POST"))
@anon_required @anon_required
def register(): def register():
@ -98,31 +114,13 @@ def register():
if request.method == "POST": if request.method == "POST":
username = request.form["username"] username = request.form["username"]
password = request.form["password"] password = request.form["password"]
db = get_db()
error = None
if not username: error = create_user(username, password)
error = "Un nom d'utilisateur est requis."
elif not password:
error = "Un mot de passe est requis."
if error is None: if error is not None:
try: flash(error)
db.execute( else:
"INSERT INTO user (username, password) VALUES (?, ?)", flash("Utilisateur créé avec succès. Vous pouvez vous connecter.")
(username, generate_password_hash(password)),
)
db.commit()
flash(f"Utilisateur {username} créé avec succès. Vous pouvez vous connecter.")
except db.IntegrityError:
# The username was already taken, which caused the
# commit to fail. Show a validation error.
error = f"Le nom d'utilisateur {username} est déjà pris. Vous souhaitez peut-être vous connecter"
else:
# Success, go to the login page.
return redirect(url_for("auth.login"))
flash(error)
return render_template("auth/register.html") return render_template("auth/register.html")

View File

@ -1,3 +1,6 @@
"""
Classe Album
"""
import os import os
from ..db import get_db from ..db import get_db
@ -37,7 +40,7 @@ class Album():
else: else:
raise LookupError raise LookupError
self.users = None self.users = None
@ -131,7 +134,7 @@ class Album():
os.remove(f"partitioncloud/partitions/{partition['uuid']}.pdf") os.remove(f"partitioncloud/partitions/{partition['uuid']}.pdf")
if os.path.exists(f"partitioncloud/static/thumbnails/{partition['uuid']}.jpg"): if os.path.exists(f"partitioncloud/static/thumbnails/{partition['uuid']}.jpg"):
os.remove(f"partitioncloud/static/thumbnails/{partition['uuid']}.jpg") os.remove(f"partitioncloud/static/thumbnails/{partition['uuid']}.jpg")
partitions = db.execute( partitions = db.execute(
""" """
DELETE FROM partition DELETE FROM partition

View File

@ -1,7 +1,5 @@
import os import os
from flask import current_app
from ..db import get_db from ..db import get_db
@ -41,8 +39,8 @@ class Attachment():
(self.uuid,) (self.uuid,)
) )
db.commit() db.commit()
os.remove(f"partitioncloud/attachments/{self.uuid}.{self.filetype}") os.remove(f"partitioncloud/attachments/{self.uuid}.{self.filetype}")
def __repr__(self): def __repr__(self):
return f"{self.name}.{self.filetype}" return f"{self.name}.{self.filetype}"

View File

@ -1,5 +1,4 @@
import os import os
from flask import current_app
from ..db import get_db from ..db import get_db
from .user import User from .user import User
@ -42,7 +41,7 @@ class Partition():
(self.uuid,) (self.uuid,)
) )
db.commit() db.commit()
os.remove(f"partitioncloud/partitions/{self.uuid}.pdf") os.remove(f"partitioncloud/partitions/{self.uuid}.pdf")
if os.path.exists(f"partitioncloud/static/thumbnails/{self.uuid}.jpg"): if os.path.exists(f"partitioncloud/static/thumbnails/{self.uuid}.jpg"):
os.remove(f"partitioncloud/static/thumbnails/{self.uuid}.jpg") os.remove(f"partitioncloud/static/thumbnails/{self.uuid}.jpg")
@ -61,7 +60,7 @@ class Partition():
def update(self, name=None, author="", body=""): def update(self, name=None, author="", body=""):
if name is None: if name is None:
return Exception("name cannot be None") raise ValueError("name cannot be None")
db = get_db() db = get_db()
db.execute( db.execute(

View File

@ -37,7 +37,7 @@ class User():
if self.id is None and self.username is None: if self.id is None and self.username is None:
self.username = "" self.username = ""
self.access_level = -1 self.access_level = -1
else: else:
if self.id is not None: if self.id is not None:
data = db.execute( data = db.execute(
@ -55,7 +55,7 @@ class User():
""", """,
(self.username,) (self.username,)
).fetchone() ).fetchone()
self.id = data["id"] self.id = data["id"]
self.username = data["username"] self.username = data["username"]
self.access_level = data["access_level"] self.access_level = data["access_level"]
@ -68,7 +68,7 @@ class User():
def is_participant(self, album_uuid, exclude_groupe=False): def is_participant(self, album_uuid, exclude_groupe=False):
db = get_db() db = get_db()
return (len(db.execute( # Is participant directly in the album return (len(db.execute( # Is participant directly in the album
""" """
SELECT album.id FROM album SELECT album.id FROM album
@ -77,7 +77,7 @@ class User():
WHERE user.id = ? AND album.uuid = ? WHERE user.id = ? AND album.uuid = ?
""", """,
(self.id, album_uuid) (self.id, album_uuid)
).fetchall()) == 1 or ).fetchall()) == 1 or
# Is participant in a group that has this album # Is participant in a group that has this album
((not exclude_groupe) and (len(db.execute( ((not exclude_groupe) and (len(db.execute(
""" """
@ -165,7 +165,7 @@ class User():
(self.id,), (self.id,),
).fetchall() ).fetchall()
return self.partitions return self.partitions
def join_album(self, album_uuid): def join_album(self, album_uuid):
db = get_db() db = get_db()
album = Album(uuid=album_uuid) album = Album(uuid=album_uuid)
@ -226,5 +226,4 @@ class User():
if len(colors) == 0: if len(colors) == 0:
integer = hash(self.username) % 16777215 integer = hash(self.username) % 16777215
return "#" + str(hex(integer))[2:] return "#" + str(hex(integer))[2:]
else: return f"var({colors[hash(self.username) %len(colors)]})"
return f"var({colors[hash(self.username) %len(colors)]})"

View File

@ -16,13 +16,3 @@ def get_db():
g.db.row_factory = sqlite3.Row g.db.row_factory = sqlite3.Row
return g.db return g.db
def close_db(e=None):
"""If this request connected to the database, close the
connection.
"""
db = g.pop("db", None)
if db is not None:
db.close()

View File

@ -2,15 +2,12 @@
""" """
Groupe module Groupe module
""" """
import os from flask import (Blueprint, abort, flash, redirect, render_template,
request, session)
from flask import (Blueprint, abort, flash, redirect, render_template, request,
send_file, session, current_app)
from .auth import login_required from .auth import login_required
from .db import get_db from .db import get_db
from .utils import User, Album, get_all_partitions, Groupe, new_uuid, get_qrcode, format_uuid from .utils import User, Album, Groupe, new_uuid, get_qrcode, format_uuid
from . import search
bp = Blueprint("groupe", __name__, url_prefix="/groupe") bp = Blueprint("groupe", __name__, url_prefix="/groupe")
@ -21,7 +18,7 @@ def index():
@bp.route("/<uuid>") @bp.route("/<uuid>")
def groupe(uuid): def get_groupe(uuid):
""" """
Groupe page Groupe page
""" """
@ -37,7 +34,7 @@ def groupe(uuid):
groupe.users = [User(user_id=i["id"]) for i in groupe.get_users()] groupe.users = [User(user_id=i["id"]) for i in groupe.get_users()]
groupe.get_albums() groupe.get_albums()
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
if user.id is None: if user.id is None:
# On ne propose pas aux gens non connectés de rejoindre l'album # On ne propose pas aux gens non connectés de rejoindre l'album
not_participant = False not_participant = False
@ -61,8 +58,6 @@ def album_qr_code(uuid):
@bp.route("/create-groupe", methods=["POST"]) @bp.route("/create-groupe", methods=["POST"])
@login_required @login_required
def create_groupe(): def create_groupe():
current_user = User(user_id=session.get("user_id"))
name = request.form["name"] name = request.form["name"]
db = get_db() db = get_db()
error = None error = None
@ -102,8 +97,7 @@ def create_groupe():
"status": "ok", "status": "ok",
"uuid": uuid "uuid": uuid
} }
else: return redirect(f"/groupe/{uuid}")
return redirect(f"/groupe/{uuid}")
flash(error) flash(error)
return redirect(request.referrer) return redirect(request.referrer)
@ -139,21 +133,20 @@ def quit_groupe(uuid):
user.quit_groupe(groupe.uuid) user.quit_groupe(groupe.uuid)
flash("Groupe quitté.") flash("Groupe quitté.")
return redirect(f"/albums") return redirect("/albums")
@bp.route("/<uuid>/delete", methods=["POST"]) @bp.route("/<uuid>/delete", methods=["POST"])
@login_required @login_required
def delete_groupe(uuid): def delete_groupe(uuid):
db = get_db()
groupe = Groupe(uuid=uuid) groupe = Groupe(uuid=uuid)
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
error = None error = None
users = groupe.get_users() users = groupe.get_users()
if len(users) > 1: if len(users) > 1:
error = "Vous n'êtes pas seul dans ce groupe." error = "Vous n'êtes pas seul dans ce groupe."
if user.access_level == 1 or user.id not in groupe.get_admins(): if user.access_level == 1 or user.id not in groupe.get_admins():
error = None error = None
@ -220,8 +213,7 @@ def create_album(groupe_uuid):
"status": "ok", "status": "ok",
"uuid": uuid "uuid": uuid
} }
else: return redirect(f"/groupe/{groupe.uuid}/{uuid}")
return redirect(f"/groupe/{groupe.uuid}/{uuid}")
flash(error) flash(error)
return redirect(request.referrer) return redirect(request.referrer)
@ -229,7 +221,7 @@ def create_album(groupe_uuid):
@bp.route("/<groupe_uuid>/<album_uuid>") @bp.route("/<groupe_uuid>/<album_uuid>")
def album(groupe_uuid, album_uuid): def get_album(groupe_uuid, album_uuid):
""" """
Album page Album page
""" """
@ -254,7 +246,7 @@ def album(groupe_uuid, album_uuid):
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
# List of users without duplicate # List of users without duplicate
users_id = list(set([i["id"] for i in album.get_users()+groupe.get_users()])) users_id = list({i["id"] for i in album.get_users()+groupe.get_users()})
album.users = [User(user_id=id) for id in users_id] album.users = [User(user_id=id) for id in users_id]
partitions = album.get_partitions() partitions = album.get_partitions()
@ -277,4 +269,4 @@ def album(groupe_uuid, album_uuid):
@bp.route("/<groupe_uuid>/<album_uuid>/qr") @bp.route("/<groupe_uuid>/<album_uuid>/qr")
def groupe_qr_code(groupe_uuid, album_uuid): def groupe_qr_code(groupe_uuid, album_uuid):
return get_qrcode(f"/groupe/{groupe_uuid}/{album_uuid}") return get_qrcode(f"/groupe/{groupe_uuid}/{album_uuid}")

View File

@ -14,14 +14,13 @@ from .utils import get_all_partitions, User, Partition, Attachment
bp = Blueprint("partition", __name__, url_prefix="/partition") bp = Blueprint("partition", __name__, url_prefix="/partition")
@bp.route("/<uuid>") @bp.route("/<uuid>")
def partition(uuid): def get_partition(uuid):
db = get_db()
try: try:
partition = Partition(uuid=uuid) partition = Partition(uuid=uuid)
except LookupError: except LookupError:
abort(404) abort(404)
return send_file( return send_file(
os.path.join("partitions", f"{uuid}.pdf"), os.path.join("partitions", f"{uuid}.pdf"),
download_name = f"{partition.name}.pdf" download_name = f"{partition.name}.pdf"
@ -29,12 +28,11 @@ def partition(uuid):
@bp.route("/<uuid>/attachments") @bp.route("/<uuid>/attachments")
def attachments(uuid): def attachments(uuid):
db = get_db()
try: try:
partition = Partition(uuid=uuid) partition = Partition(uuid=uuid)
except LookupError: except LookupError:
abort(404) abort(404)
partition.load_attachments() partition.load_attachments()
return render_template( return render_template(
"partition/attachments.html", "partition/attachments.html",
@ -65,10 +63,10 @@ def add_attachment(uuid):
name = ".".join(request.files["file"].filename.split(".")[:-1]) name = ".".join(request.files["file"].filename.split(".")[:-1])
else: else:
name = request.form["name"] name = request.form["name"]
if name == "": if name == "":
error = "Pas de nom de fichier" error = "Pas de nom de fichier"
else: else:
filename = request.files["file"].filename filename = request.files["file"].filename
ext = filename.split(".")[-1] ext = filename.split(".")[-1]
@ -97,7 +95,7 @@ def add_attachment(uuid):
break break
except db.IntegrityError: except db.IntegrityError:
pass pass
if "response" in request.args and request.args["response"] == "json": if "response" in request.args and request.args["response"] == "json":
@ -105,31 +103,28 @@ def add_attachment(uuid):
"status": "ok", "status": "ok",
"uuid": attachment_uuid "uuid": attachment_uuid
} }
else: return redirect(f"/partition/{partition.uuid}/attachments")
return redirect(f"/partition/{partition.uuid}/attachments")
@bp.route("/attachment/<uuid>.<filetype>") @bp.route("/attachment/<uuid>.<filetype>")
def attachment(uuid, filetype): def get_attachment(uuid, filetype):
db = get_db()
try: try:
attachment = Attachment(uuid=uuid) attachment = Attachment(uuid=uuid)
except LookupError: except LookupError:
abort(404) abort(404)
assert filetype == attachment.filetype assert filetype == attachment.filetype
return send_file( return send_file(
os.path.join("attachments", f"{uuid}.{attachment.filetype}"), os.path.join("attachments", f"{uuid}.{attachment.filetype}"),
download_name = f"{attachment.name}.{attachment.filetype}" download_name = f"{attachment.name}.{attachment.filetype}"
) )
@bp.route("/<uuid>/edit", methods=["GET", "POST"]) @bp.route("/<uuid>/edit", methods=["GET", "POST"])
@login_required @login_required
def edit(uuid): def edit(uuid):
db = get_db()
try: try:
partition = Partition(uuid=uuid) partition = Partition(uuid=uuid)
except LookupError: except LookupError:
@ -155,7 +150,7 @@ def edit(uuid):
if error is not None: if error is not None:
flash(error) flash(error)
return redirect(f"/partition/{ uuid }/edit") return redirect(f"/partition/{ uuid }/edit")
partition.update( partition.update(
name=request.form["name"], name=request.form["name"],
author=request.form["author"], author=request.form["author"],
@ -169,7 +164,6 @@ def edit(uuid):
@bp.route("/<uuid>/details", methods=["GET", "POST"]) @bp.route("/<uuid>/details", methods=["GET", "POST"])
@admin_required @admin_required
def details(uuid): def details(uuid):
db = get_db()
try: try:
partition = Partition(uuid=uuid) partition = Partition(uuid=uuid)
except LookupError: except LookupError:
@ -202,7 +196,7 @@ def details(uuid):
if error is not None: if error is not None:
flash(error) flash(error)
return redirect(f"/partition/{ uuid }/details") return redirect(f"/partition/{ uuid }/details")
partition.update( partition.update(
name=request.form["name"], name=request.form["name"],
author=request.form["author"], author=request.form["author"],
@ -260,4 +254,4 @@ def partition_search(uuid):
def index(): def index():
partitions = get_all_partitions() partitions = get_all_partitions()
user = User(user_id=session.get("user_id")) user = User(user_id=session.get("user_id"))
return render_template("admin/partitions.html", partitions=partitions, user=user) return render_template("admin/partitions.html", partitions=partitions, user=user)

View File

@ -39,7 +39,7 @@ def local_search(query, partitions):
score_partitions = [(score_attribution(partition), partition) for partition in partitions] score_partitions = [(score_attribution(partition), partition) for partition in partitions]
score_partitions.sort(key=lambda x: x[0], reverse=True) score_partitions.sort(key=lambda x: x[0], reverse=True)
selection = [] selection = []
for score, partition in score_partitions[:5]: for score, partition in score_partitions[:5]:
if score > 0: if score > 0:
@ -56,8 +56,8 @@ def download_search_result(element):
try: try:
urllib.request.urlretrieve(url, f"partitioncloud/search-partitions/{uuid}.pdf") urllib.request.urlretrieve(url, f"partitioncloud/search-partitions/{uuid}.pdf")
except (urllib.error.HTTPError, urllib.error.URLError) as e: except (urllib.error.HTTPError, urllib.error.URLError):
with open(f"partitioncloud/search-partitions/{uuid}.pdf",'a') as f: with open(f"partitioncloud/search-partitions/{uuid}.pdf", 'a', encoding="utf8") as _:
pass # Create empty file pass # Create empty file
@ -112,7 +112,6 @@ def online_search(query, num_queries):
thread.join() thread.join()
for element in partitions: for element in partitions:
pass
uuid = element["uuid"] uuid = element["uuid"]
url = element["url"] url = element["url"]
if os.stat(f"partitioncloud/search-partitions/{uuid}.pdf").st_size == 0: if os.stat(f"partitioncloud/search-partitions/{uuid}.pdf").st_size == 0:
@ -140,7 +139,7 @@ def flush_cache():
db = get_db() db = get_db()
expired_cache = db.execute( expired_cache = db.execute(
""" """
SELECT uuid FROM search_results SELECT uuid FROM search_results
WHERE creation_time <= datetime('now', '-15 minutes', 'localtime') WHERE creation_time <= datetime('now', '-15 minutes', 'localtime')
""" """
).fetchall() ).fetchall()
@ -161,4 +160,4 @@ def flush_cache():
WHERE creation_time <= datetime('now', '-15 minutes', 'localtime') WHERE creation_time <= datetime('now', '-15 minutes', 'localtime')
""" """
) )
db.commit() db.commit()

View File

@ -1,12 +1,11 @@
#!/usr/bin/python3 #!/usr/bin/python3
import os
import io import io
import random import random
import string import string
import qrcode import qrcode
from .db import get_db
from flask import current_app, send_file from flask import current_app, send_file
from .db import get_db
def new_uuid(): def new_uuid():
@ -71,4 +70,4 @@ def get_all_albums():
"name": a["name"], "name": a["name"],
"uuid": a["uuid"] "uuid": a["uuid"]
} for a in albums } for a in albums
] ]

View File

@ -2,6 +2,7 @@ import random
import string import string
import sqlite3 import sqlite3
def run_sqlite_command(*args): def run_sqlite_command(*args):
"""Run a command against the database""" """Run a command against the database"""
con = sqlite3.connect("instance/partitioncloud.sqlite") con = sqlite3.connect("instance/partitioncloud.sqlite")
@ -10,18 +11,23 @@ def run_sqlite_command(*args):
con.commit() con.commit()
con.close() con.close()
def get_sqlite_data(*args): def get_sqlite_data(*args):
"""Get data from the db""" """Get data from the db"""
con = sqlite3.connect("instance/partitioncloud.sqlite") con = sqlite3.connect("instance/partitioncloud.sqlite")
cur = con.cursor() cur = con.cursor()
data = cur.execute(*args) data = cur.execute(*args)
new_data = [i for i in data] new_data = list(data)
con.close() con.close()
return new_data return new_data
def new_uuid(): def new_uuid():
return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(6)]) return "".join(
[random.choice(string.ascii_uppercase + string.digits) for _ in range(6)]
)
def format_uuid(uuid): def format_uuid(uuid):
"""Format old uuid4 format""" """Format old uuid4 format"""
return uuid.upper()[:6] return uuid.upper()[:6]

View File

@ -6,10 +6,11 @@ from colorama import Fore, Style
""" """
v1.3.* v1.3.*
""" """
def add_source(): def add_source():
utils.run_sqlite_command( utils.run_sqlite_command("ALTER TABLE partition ADD source TEXT DEFAULT 'unknown'")
"ALTER TABLE partition ADD source TEXT DEFAULT 'unknown'"
)
def add_groupes(): def add_groupes():
utils.run_sqlite_command( utils.run_sqlite_command(
@ -35,6 +36,7 @@ def add_groupes():
);""" );"""
) )
def add_attachments(): def add_attachments():
os.makedirs("partitioncloud/attachments", exist_ok=True) os.makedirs("partitioncloud/attachments", exist_ok=True)
utils.run_sqlite_command( utils.run_sqlite_command(
@ -47,6 +49,7 @@ def add_attachments():
);""" );"""
) )
def install_colorama(): def install_colorama():
os.system("pip install colorama -qq") os.system("pip install colorama -qq")
@ -54,19 +57,17 @@ def install_colorama():
""" """
v1.4.* v1.4.*
""" """
def mass_rename(): def mass_rename():
"""Rename all albums & groupes to use a shorter uuid""" """Rename all albums & groupes to use a shorter uuid"""
albums = utils.get_sqlite_data("SELECT * FROM album") albums = utils.get_sqlite_data("SELECT * FROM album")
groupes = utils.get_sqlite_data("SELECT * FROM groupe") groupes = utils.get_sqlite_data("SELECT * FROM groupe")
utils.run_sqlite_command( utils.run_sqlite_command("ALTER TABLE groupe RENAME TO _groupe_old")
"ALTER TABLE groupe RENAME TO _groupe_old" utils.run_sqlite_command("ALTER TABLE album RENAME TO _album_old")
)
utils.run_sqlite_command(
"ALTER TABLE album RENAME TO _album_old"
)
utils.run_sqlite_command( # Add UNIQUE constraint & change uuid length utils.run_sqlite_command( # Add UNIQUE constraint & change uuid length
"""CREATE TABLE groupe ( """CREATE TABLE groupe (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
name TEXT NOT NULL, name TEXT NOT NULL,
@ -74,7 +75,7 @@ def mass_rename():
);""" );"""
) )
utils.run_sqlite_command( # Change uuid length utils.run_sqlite_command( # Change uuid length
"""CREATE TABLE album ( """CREATE TABLE album (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
name TEXT NOT NULL, name TEXT NOT NULL,
@ -89,19 +90,22 @@ def mass_rename():
INSERT INTO album (id, name, uuid) INSERT INTO album (id, name, uuid)
VALUES (?, ?, ?) VALUES (?, ?, ?)
""", """,
(album[0], album[1], utils.format_uuid(album[2])) (album[0], album[1], utils.format_uuid(album[2])),
) )
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
uuid = new_uuid() uuid = utils.new_uuid()
print(f"{Fore.RED}Collision on {album[1]}{Style.RESET_ALL} ({album[2][:10]} renaming to {uuid})") print(
f"{Fore.RED}Collision on {album[1]}{Style.RESET_ALL} \
({album[2][:10]} renaming to {uuid})"
)
utils.run_sqlite_command( utils.run_sqlite_command(
""" """
INSERT INTO album (id, name, uuid) INSERT INTO album (id, name, uuid)
VALUES (?, ?, ?) VALUES (?, ?, ?)
""", """,
(album[0], album[1], uuid) (album[0], album[1], uuid),
) )
for groupe in groupes: for groupe in groupes:
try: try:
utils.run_sqlite_command( utils.run_sqlite_command(
@ -109,28 +113,32 @@ def mass_rename():
INSERT INTO groupe (id, name, uuid) INSERT INTO groupe (id, name, uuid)
VALUES (?, ?, ?) VALUES (?, ?, ?)
""", """,
(groupe[0], groupe[1], utils.format_uuid(groupe[2])) (groupe[0], groupe[1], utils.format_uuid(groupe[2])),
) )
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
uuid = new_uuid() uuid = utils.new_uuid()
print(f"{Fore.RED}Collision on {groupe[1]}{Style.RESET_ALL} ({groupe[2][:10]} renaming to {uuid})") print(
f"{Fore.RED}Collision on {groupe[1]}{Style.RESET_ALL} \
({groupe[2][:10]} renaming to {uuid})"
)
utils.run_sqlite_command( utils.run_sqlite_command(
""" """
INSERT INTO groupe (id, name, uuid) INSERT INTO groupe (id, name, uuid)
VALUES (?, ?, ?) VALUES (?, ?, ?)
""", """,
(groupe[0], groupe[1], uuid) (groupe[0], groupe[1], uuid),
) )
utils.run_sqlite_command( utils.run_sqlite_command("DROP TABLE _groupe_old")
"DROP TABLE _groupe_old" utils.run_sqlite_command("DROP TABLE _album_old")
)
utils.run_sqlite_command(
"DROP TABLE _album_old"
)
def base_url_parameter_added(): def base_url_parameter_added():
print(f"{Style.BRIGHT}{Fore.YELLOW}The parameter BASE_URL has been added, reference your front url in it{Style.RESET_ALL}") print(
f"{Style.BRIGHT}{Fore.YELLOW}The parameter BASE_URL has been added, \
reference your front url in it{Style.RESET_ALL}"
)
def install_qrcode(): def install_qrcode():
os.system("pip install qrcode -qq") os.system("pip install qrcode -qq")

View File

@ -1,18 +1,18 @@
#!/usr/bin/python3 #!/usr/bin/python3
import os import os
import sys
import shutil import shutil
import argparse import argparse
from functools import cmp_to_key from functools import cmp_to_key
from distutils.dir_util import copy_tree
from colorama import Fore, Style from colorama import Fore, Style
from hooks import v1 from hooks import v1 as v1_hooks
def get_version(v: str) -> (int, int, int): def get_version(v: str) -> (int, int, int):
"""Returns a tuple (major, minor, patch from the string v{major}.{minor}.{patch})""" """Returns a tuple (major, minor, patch from the string v{major}.{minor}.{patch})"""
assert (v[0] == 'v') # Check if the version is correctly formatted assert v[0] == "v" # Check if the version is correctly formatted
return tuple(map(int, v[1:].split('.'))) return tuple(map(int, v[1:].split(".")))
def is_newer(v1: str, v2: str) -> bool: def is_newer(v1: str, v2: str) -> bool:
@ -21,41 +21,36 @@ def is_newer(v1: str, v2: str) -> bool:
hooks = [ hooks = [
("v1.3.0", [ ("v1.3.0", [("add SOURCE column", v1_hooks.add_source)]),
("add SOURCE column", v1.add_source) ("v1.2.0", [("create groupe structure", v1_hooks.add_groupes)]),
]), ("v1.3.0", [("create attachment table", v1_hooks.add_attachments)]),
("v1.2.0", [ ("v1.3.3", [("Install colorama", v1_hooks.install_colorama)]),
("create groupe structure", v1.add_groupes) (
]), "v1.4.0",
("v1.3.0", [ [
("create attachment table", v1.add_attachments) ("Change all albums & groupes uuids", v1_hooks.mass_rename),
]), ("Warn new parameter", v1_hooks.base_url_parameter_added),
("v1.3.3", [ ],
("Install colorama", v1.install_colorama) ),
]), ("v1.4.1", [("Install qrcode", v1_hooks.install_qrcode)]),
("v1.4.0", [
("Change all albums & groupes uuids", v1.mass_rename),
("Warn new parameter", v1.base_url_parameter_added)
]),
("v1.4.1", [
("Install qrcode", v1.install_qrcode)
]),
] ]
def get_hooks(current, target): def get_hooks(current, target):
"""Returns a list of hooks needed to migrate""" """Returns a list of hooks needed to migrate"""
def compare(v1: str, v2: str): def compare(v1: str, v2: str):
if is_newer(v2[0], v1[0]): if is_newer(v2[0], v1[0]):
return -1 return -1
elif is_newer(v1[0], v2[0]): if is_newer(v1[0], v2[0]):
return 1 return 1
else: return 0
return 0
applied_hooks = [] applied_hooks = []
for hook in hooks: for hook in hooks:
if is_newer(hook[0], current) and (target == hook[0] or is_newer(target, hook[0])): if is_newer(hook[0], current) and (
target == hook[0] or is_newer(target, hook[0])
):
applied_hooks.append(hook) applied_hooks.append(hook)
return sorted(applied_hooks, key=cmp_to_key(compare)) return sorted(applied_hooks, key=cmp_to_key(compare))
@ -63,67 +58,82 @@ def get_hooks(current, target):
def backup_instance(version, verbose=True): def backup_instance(version, verbose=True):
"""Backs up current instance in backups/{version}""" """Backs up current instance in backups/{version}"""
def print_verbose(*args):
def print_verbose(*f_args):
if verbose: if verbose:
print(*args) print(*f_args)
print_verbose("\nBacking up current instance") print_verbose("\nBacking up current instance")
dest = os.path.join("backups", version) dest = os.path.join("backups", version)
if os.path.exists(dest): if os.path.exists(dest):
print(f"{Fore.RED}Backup directory already exists{Style.RESET_ALL}") print(f"{Fore.RED}Backup directory already exists{Style.RESET_ALL}")
exit(1) sys.exit(1)
os.makedirs(dest) os.makedirs(dest)
paths = [ paths = [
("instance", os.path.join(dest, "instance")), ("instance", os.path.join(dest, "instance")),
(os.path.join("partitioncloud", "partitions"), os.path.join(dest, "partitions")), (
(os.path.join("partitioncloud", "attachments"), os.path.join(dest, "attachments")), os.path.join("partitioncloud", "partitions"),
(os.path.join("partitioncloud", "search-partitions"), os.path.join(dest, "search-partitions")) os.path.join(dest, "partitions"),
),
(
os.path.join("partitioncloud", "attachments"),
os.path.join(dest, "attachments"),
),
(
os.path.join("partitioncloud", "search-partitions"),
os.path.join(dest, "search-partitions"),
),
] ]
for src, dst in paths: for src, dst in paths:
if os.path.exists(src): if os.path.exists(src):
print_verbose(f"\tBacking up {src}") print_verbose(f"\tBacking up {src}")
copy_tree(src, dst) shutil.copy_tree(src, dst)
def print_hooks(hooks): def print_hooks(hooks_list):
for hook in hooks: for hook in hooks_list:
print(f"- {Fore.BLUE}{hook[0]}{Style.RESET_ALL}:") print(f"- {Fore.BLUE}{hook[0]}{Style.RESET_ALL}:")
for subhook in hook[1]: for sub_hook in hook[1]:
print("\t", subhook[0]) print("\t", sub_hook[0])
def apply_hooks(hooks): def apply_hooks(hooks_list):
for hook in hooks: for hook in hooks_list:
print(f"Migrating to {Fore.BLUE}{hook[0]}{Style.RESET_ALL}:") print(f"Migrating to {Fore.BLUE}{hook[0]}{Style.RESET_ALL}:")
for subhook in hook[1]: for sub_hook in hook[1]:
print(f"\tApplying '{subhook[0]}'") print(f"\tApplying '{sub_hook[0]}'")
subhook[1]() sub_hook[1]()
def migrate(current, target, skip_backup=False, prog_name="scripts/migration.py"): def migrate(current, target, skip_backup=False, prog_name="scripts/migration.py"):
"""""" """"""
print(f"Trying to migrate from {args.current} to {args.target}") print(f"Trying to migrate from {current} to {target}")
assert is_newer(args.target, args.current) assert is_newer(target, current)
applied_hooks = get_hooks(args.current, args.target) applied_hooks = get_hooks(current, target)
if (len(applied_hooks) == 0): if len(applied_hooks) == 0:
print(f"{Fore.GREEN}No hook to apply{Style.RESET_ALL}") print(f"{Fore.GREEN}No hook to apply{Style.RESET_ALL}")
exit(0) sys.exit(0)
print("The following hooks will be applied:") print("The following hooks will be applied:")
print_hooks(applied_hooks) print_hooks(applied_hooks)
if input("Apply these hooks ? [y/N] ") != "y": if input("Apply these hooks ? [y/N] ") != "y":
print(f"{Fore.RED}Aborting !{Style.RESET_ALL}") print(f"{Fore.RED}Aborting !{Style.RESET_ALL}")
exit(1) sys.exit(1)
if not skip_backup: if not skip_backup:
backup_instance(current) backup_instance(current)
print(f"Instance backed up in {Style.BRIGHT}backups/{current}{Style.RESET_ALL}\n") print(
print(f"If something goes wrong, recover with {Style.BRIGHT}{Fore.BLUE}{prog_name} --restore {current}{Style.RESET_ALL}") f"Instance backed up in {Style.BRIGHT}backups/{current}{Style.RESET_ALL}\n"
)
print(
f"If something goes wrong, recover with {Style.BRIGHT}{Fore.BLUE}{prog_name}\
--restore {current}{Style.RESET_ALL}"
)
else: else:
print("Skipping automatic backup") print("Skipping automatic backup")
@ -132,17 +142,31 @@ def migrate(current, target, skip_backup=False, prog_name="scripts/migration.py"
def restore(version): def restore(version):
if input("Do you really want to restore from backup ? Your current data will be deleted [y/N] ") != "y": if (
input(
"Do you really want to restore from backup ? Your current data will be deleted [y/N] "
)
!= "y"
):
print(f"{Fore.RED}Aborting !{Style.RESET_ALL}") print(f"{Fore.RED}Aborting !{Style.RESET_ALL}")
exit(1) sys.exit(1)
dest = os.path.join("backups", version) dest = os.path.join("backups", version)
print(f"Restoring from {dest}") print(f"Restoring from {dest}")
paths = [ paths = [
("instance", os.path.join(dest, "instance")), ("instance", os.path.join(dest, "instance")),
(os.path.join("partitioncloud", "partitions"), os.path.join(dest, "partitions")), (
(os.path.join("partitioncloud", "attachments"), os.path.join(dest, "attachments")), os.path.join("partitioncloud", "partitions"),
(os.path.join("partitioncloud", "search-partitions"), os.path.join(dest, "search-partitions")) os.path.join(dest, "partitions"),
),
(
os.path.join("partitioncloud", "attachments"),
os.path.join(dest, "attachments"),
),
(
os.path.join("partitioncloud", "search-partitions"),
os.path.join(dest, "search-partitions"),
),
] ]
for src, dst in paths: for src, dst in paths:
if os.path.exists(src): if os.path.exists(src):
@ -150,24 +174,32 @@ def restore(version):
if os.path.exists(dst): if os.path.exists(dst):
print(f"\tRestoring {src}") print(f"\tRestoring {src}")
copy_tree(dst, src) shutil.copy_tree(dst, src)
else: else:
print(f"\t{Fore.RED}No available backup for {src}, deleting current content to avoid any conflict{Style.RESET_ALL}") print(
f"\t{Fore.RED}No available backup for {src}, \
deleting current content to avoid any conflict{Style.RESET_ALL}"
)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='PartitionCloud Migration tool', prog="PartitionCloud Migration tool",
description='Helps you migrate from one version to another') description="Helps you migrate from one version to another",
)
parser.add_argument('-c', '--current', help="current version (vx.y.z)") parser.add_argument("-c", "--current", help="current version (vx.y.z)")
parser.add_argument('-t', '--target', help="target version (vx.y.z)") parser.add_argument("-t", "--target", help="target version (vx.y.z)")
parser.add_argument('-s', '--skip-backup', action='store_true') parser.add_argument("-s", "--skip-backup", action="store_true")
parser.add_argument('-r', '--restore', help='restore from specific version backup, will not apply any hook (vx.y.z)') parser.add_argument(
"-r",
"--restore",
help="restore from specific version backup, will not apply any hook (vx.y.z)",
)
args = parser.parse_args() args = parser.parse_args()
if args.restore is None: if args.restore is None:
migrate(args.current, args.target, skip_backup=args.skip_backup) migrate(args.current, args.target, skip_backup=args.skip_backup)
else: else:
restore(args.restore) restore(args.restore)