Update src/parallel

This commit is contained in:
augustin64 2022-05-21 18:09:01 +02:00
parent 511a522a34
commit 553cf4d0be
4 changed files with 322 additions and 68 deletions

View File

@ -3,70 +3,149 @@
Serveur Flask pour entraîner le réseau sur plusieurs machines en parallèle. Serveur Flask pour entraîner le réseau sur plusieurs machines en parallèle.
""" """
import os import os
import time
import random import random
import subprocess
from secrets import token_urlsafe from secrets import token_urlsafe
from threading import Thread
from flask import Flask, request, send_file from flask import Flask, request, send_from_directory, session
from clients import Client, clients from structures import clients, Client, NoMoreJobAvailableError, TryLaterError, Training
# Définitions de variables
DATASET = "mnist-train" DATASET = "mnist-train"
TEST_SET = "mnist-t10k"
SECRET = str(random.randint(1000, 10000)) SECRET = str(random.randint(1000, 10000))
CACHE = ".cache" CACHE = ".cache" # À remplacer avec un chemin absolu
BATCHS = 10
RESEAU = os.path.join(CACHE, "reseau.bin")
training = Training(BATCHS, DATASET, TEST_SET, CACHE)
os.makedirs(CACHE, exist_ok=True) os.makedirs(CACHE, exist_ok=True)
# On crée un réseau aléatoire si il n'existe pas encore
if not os.path.isfile(RESEAU):
if not os.path.isfile("out/main"):
subprocess.call(["make.sh", "main"])
subprocess.call
([
"out/main", "train",
"--epochs", "0",
"--images", "data/mnist/train-images-idx3-ubyte",
"--labels", "data/mnist/train-labels-idx1-ubyte",
"--out", RESEAU,
])
print(f" * Created {RESEAU}")
else:
print(f" * {RESEAU} already exists")
app = Flask(__name__) app = Flask(__name__)
# On définit une clé secrète pour pouvoir utiliser des cookies de session
app.config["SECRET_KEY"] = token_urlsafe(40)
print(f" * Secret: {SECRET}") print(f" * Secret: {SECRET}")
@app.route("/authenticate", methods = ['POST'])
@app.route("/authenticate", methods=["POST"])
def authenticate(): def authenticate():
""" """
Authentification d'un nouvel utilisateur Authentification d'un nouvel utilisateur
""" """
if not request.is_json: if not request.is_json:
return { return {"status": "request format is not json"}
"status":
"request format is not json"
}
content = request.get_json() content = request.get_json()
if content["secret"] != SECRET: if content["secret"] != SECRET:
return {"status": "invalid secret"} return {"status": "invalid secret"}
performance = content["performance"] token = token_urlsafe(30)
while token in clients.keys():
token = token_urlsafe(30) token = token_urlsafe(30)
while token in [client.token for client in clients]: clients[token] = Client(content["performance"], token)
token = token_urlsafe(30)
clients.append(Client(performance, token))
# On prépare la réponse du serveur
data = {} data = {}
data["token"] = token
data["nb_elem"] = performance
data["start"] = 0
data["dataset"] = DATASET
data["status"] = "ok" data["status"] = "ok"
data["dataset"] = training.dataset
session["token"] = token
try:
clients[token].get_job(training)
data["nb_elem"] = clients[token].performance
data["start"] = clients[token].start
data["instruction"] = "train"
except NoMoreJobAvailableError:
data["status"] = "Training already ended"
data["nb_elem"] = 0
data["start"] = 0
data["instruction"] = "stop"
except TryLaterError:
data["status"] = "Wait for next batch"
data["nb_elem"] = 0
data["start"] = 0
data["instruction"] = "sleep"
data["sleep_time"] = 1
return data return data
@app.route("/get_network") @app.route("/post_network", methods=["POST"])
def post_network():
"""
Applique le patch renvoyé dans le nouveau réseau
"""
token = session.get("token")
if not token in clients.keys():
return {"status": "token invalide"}
while training.is_patch_locked():
time.sleep(0.1)
request.files["file"].save(training.delta)
training.patch()
# Préparation de la réponse
data = {}
data["status"] = "ok"
data["dataset"] = training.dataset
try:
clients[token].get_job(training)
data["dataset"] = training.dataset
data["nb_elem"] = clients[token].performance
data["start"] = clients[token].start
data["instruction"] = "train"
except NoMoreJobAvailableError:
data["status"] = "Training already ended"
data["nb_elem"] = 0
data["start"] = 0
data["instruction"] = "stop"
except TryLaterError:
Thread(target=training.test_network()).start()
data["status"] = "Wait for next batch"
data["nb_elem"] = 0
data["start"] = 0
data["instruction"] = "sleep"
data["sleep_time"] = 1
return data
@app.route("/get_network", methods=["GET", "POST"])
def get_network(): def get_network():
""" """
Renvoie le réseau neuronal Renvoie le réseau neuronal
""" """
if not request.is_json: token = session.get("token")
return { if not token in clients.keys():
"status": return {"status": "token invalide"}
"request format is not json"
} if token not in clients.keys():
token = request.get_json()["token"] return {"status": "token invalide"}
if token not in [client.token for client in clients]:
return { return send_from_directory(directory=CACHE, path="reseau.bin")
"status":
"token invalide"
}
return send_file(
os.path.join(CACHE, "reseau.bin")
)

View File

@ -5,15 +5,20 @@ Client se connectant au serveur Flask afin de fournir de la puissance de calcul.
import json import json
import os import os
import sys import sys
import time
import subprocess
import psutil import psutil
import requests import requests
CACHE = ".cache" # Définition de constantes
CACHE = ".cache" # Replace with an absolute path
DELTA = os.path.join(CACHE, "delta_shared.bin") DELTA = os.path.join(CACHE, "delta_shared.bin")
RESEAU = os.path.join(CACHE, "reseau_shared.bin") RESEAU = os.path.join(CACHE, "reseau_shared.bin")
SECRET = input("SECRET : ") SECRET = input("SECRET : ")
HOST = input("HOST : ") HOST = input("HOST : ")
session = requests.Session()
os.makedirs(CACHE, exist_ok=True) os.makedirs(CACHE, exist_ok=True)
@ -23,7 +28,7 @@ def get_performance():
""" """
cores = os.cpu_count() cores = os.cpu_count()
max_freq = psutil.cpu_freq()[2] max_freq = psutil.cpu_freq()[2]
return int(cores * max_freq * 0.01) return int(cores * max_freq * 0.5)
def authenticate(): def authenticate():
@ -31,54 +36,106 @@ def authenticate():
S'inscrit en tant que client auprès du serveur S'inscrit en tant que client auprès du serveur
""" """
performance = get_performance() performance = get_performance()
data = { data = {"performance": performance, "secret": SECRET}
"performance":performance, # Les données d'identification seront ensuite stockées dans un cookie de l'objet session
"secret": SECRET req = session.post(f"http://{HOST}/authenticate", json=data)
}
req = requests.post(
f"http://{HOST}/authenticate",
json=data
)
data = json.loads(req.text) data = json.loads(req.text)
if data["status"] != "ok": if data["status"] != "ok":
print("authentication error:", data["status"]) print("error in authenticate():", data["status"])
sys.exit(1) sys.exit(1)
else: else:
return data return data
def download_network(token): def download_network():
""" """
Récupère le réseau depuis le serveur Récupère le réseau depuis le serveur
""" """
data = {"token": token} with session.get(f"http://{HOST}/get_network", stream=True) as req:
with requests.get(f"http://{HOST}/get_network", stream=True, json=data) as req:
req.raise_for_status() req.raise_for_status()
with open(os.path.join(CACHE, RESEAU), "wb") as file: with open(RESEAU, "wb") as file:
for chunk in req.iter_content(chunk_size=8192): for chunk in req.iter_content(chunk_size=8192):
file.write(chunk) file.write(chunk)
def send_delta_network(continue_=False):
"""
Envoie le réseau différentiel et obéit aux instructions suivantes
"""
with open(DELTA, "rb") as file:
files = {"file": file}
req = session.post(f"http://{HOST}/post_network", files=files)
req_data = json.loads(req.text)
# Actions à effectuer en fonction de la réponse
if "instruction" not in req_data.keys():
print(req_data["status"])
raise NotImplementedError
if req_data["instruction"] == "sleep":
print(f"Sleeping {req_data['sleep_time']}s.")
time.sleep(req_data["sleep_time"])
send_delta_network(continue_=continue_)
elif req_data["instruction"] == "stop":
print(req_data["status"])
print("Shutting down.")
elif req_data["instruction"] == "train":
download_network()
train_shared(req_data["dataset"], req_data["start"], req_data["nb_elem"])
else:
json.dumps(req_data)
raise NotImplementedError
def train_shared(dataset, start, nb_elem, epochs=1, out=DELTA): def train_shared(dataset, start, nb_elem, epochs=1, out=DELTA):
""" """
Entraînement du réseau Entraînement du réseau
""" """
# Utiliser un dictionnaire serait plus efficace et plus propre
if dataset == "mnist-train":
images = "data/mnist/train-images-idx3-ubyte"
labels = "data/mnist/train-labels-idx1-ubyte"
elif dataset == "mnist-t10k":
images = "data/mnist/t10k-images-idx3-ubyte"
labels = "data/mnist/t10k-labels-idx1-ubyte"
else:
print(f"Dataset {dataset} not implemented yet")
raise NotImplementedError raise NotImplementedError
# On compile out/main si il n'existe pas encore
if not os.path.isfile("out/main"):
subprocess.call(["make.sh", "main"])
# Entraînement du réseau
subprocess.call(
[
"out/main", "train",
"--epochs", str(epochs),
"--images", images,
"--labels", labels,
"--recover", RESEAU,
"--delta", out,
"--nb-images", str(nb_elem),
"--start", str(start),
],
stdout=subprocess.DEVNULL,
)
return send_delta_network(continue_=True)
def __main__(): def __main__():
data = authenticate() data = authenticate()
token = data["token"]
dataset = data["dataset"] dataset = data["dataset"]
start = data["start"] start = data["start"]
nb_elem = data["nb_elem"] nb_elem = data["nb_elem"]
download_network(token) download_network()
# train_shared s'appelle récursivement sur lui même jusqu'à la fin du programme
while True:
train_shared(dataset, start, nb_elem, epochs=1, out=DELTA) train_shared(dataset, start, nb_elem, epochs=1, out=DELTA)

View File

@ -1,15 +0,0 @@
#!/usr/bin/python3
"""
Description des clients se connectant au serveur.
"""
class Client():
"""
Classe client
"""
def __init__(self, performance, token):
self.performance = performance
self.token = token
clients = []

133
src/parallel/structures.py Normal file
View File

@ -0,0 +1,133 @@
#!/usr/bin/python3
"""
Description des structures.
"""
import os
import time
import subprocess
class NoMoreJobAvailableError(Exception):
"""Entraînement du réseau fini"""
pass
class TryLaterError(Exception):
"""Batch fini, réessayer plus tard"""
pass
class Client:
"""
Description d'un client se connectant au serveur
"""
def __init__(self, performance, token):
self.performance = performance
self.token = token
self.start = 0
self.nb_images = 0
def get_job(self, training):
"""
Donne un travail au client
"""
if training.nb_images == training.cur_image:
if training.batchs == training.cur_batch:
raise NoMoreJobAvailableError
raise TryLaterError
self.start = training.cur_image
self.nb_images = min(training.nb_images - training.cur_image, self.performance)
training.cur_image += self.nb_images
clients = {}
class Training:
"""
Classe training
"""
def __init__(self, batchs, dataset, test_set, cache):
# Définition de variables
self.batchs = batchs
self.cur_batch = 1
self.cur_image = 0
self.dataset = dataset
self.test_set = test_set
self.cache = cache
self.reseau = os.path.join(self.cache, "reseau.bin")
self.delta = os.path.join(self.cache, "delta.bin")
# Définition des chemins et données relatives à chaque set de données
# TODO: implémenter plus proprement avec un dictionnaire ou même un fichier datasets.json
if self.dataset == "mnist-train":
self.nb_images = 60000
elif self.dataset == "mnist-t10k":
self.nb_images = 10000
else:
raise NotImplementedError
if self.test_set == "mnist-train":
self.test_images = "data/mnist/train-images-idx3-ubyte"
self.test_labels = "data/mnist/train-labels-idx1-ubyte"
elif self.test_set == "mnist-t10k":
self.test_images = "data/mnist/t10k-images-idx3-ubyte"
self.test_labels = "data/mnist/t10k-labels-idx1-ubyte"
else:
print(f"{self.test_set} test dataset unknown.")
raise NotImplementedError
# On supprime le fichier de lock qui permet de
# ne pas écrire en même temps plusieurs fois sur le fichier reseau.bin
if os.path.isfile(self.reseau + ".lock"):
os.remove(self.reseau + ".lock")
def test_network(self):
"""
Teste les performances du réseau avant le batch suivant
"""
if not os.path.isfile("out/main"):
subprocess.call(["make.sh", "main"])
subprocess.call(
[
"out/main", "test",
"--images", self.test_images,
"--labels", self.test_labels,
"--modele", self.reseau
])
self.cur_batch += 1
self.cur_image = 0
def patch(self):
"""
Applique un patch au réseau
"""
# On attend que le lock se libère puis on patch le réseau
while self.is_patch_locked():
time.sleep(0.1)
with open(self.reseau + ".lock", "w", encoding="utf8") as file:
file.write("")
if not os.path.isfile("out/main"):
subprocess.call(["make.sh", "utils"])
subprocess.call
([
"out/utils", "patch-network",
"--network", self.reseau,
"--delta", self.delta,
])
os.remove(self.reseau + ".lock")
def is_patch_locked(self):
"""
Petit raccourci pour vérifier si le lock est présent
"""
return os.path.isfile(self.reseau + ".lock")