implémentation de la recherche PC + du VNC + requirement.txt + joli logger + séparation en plus de fichiers

This commit is contained in:
piair 2024-01-18 16:35:17 +01:00
parent 48e201043b
commit 4c376acf17
8 changed files with 159 additions and 100 deletions

View File

@ -1,83 +1,32 @@
from playwright.sync_api import sync_playwright, expect, Page from playwright.sync_api import sync_playwright, expect, Page, BrowserContext
from playwright_stealth import stealth_sync from playwright_stealth import stealth_sync
from logging import * from pyvirtualdisplay.smartdisplay import SmartDisplay
from actions.checks import check_logged_in from actions.cards import daily_cards, more_cards, all_cards
from actions.quiz import play_quiz_2, play_quiz_4, play_quiz_8, play_poll, detect_quiz_type, play
from actions.login import login from actions.login import login
from actions.websearch import pc_search from actions.websearch import pc_search
from tools.config import create_display, start_browser
basicConfig(encoding='utf-8', level=DEBUG) from tools.logger import *
def daily_cards(page: Page): def routine(mail: str, pwd: str, vnc: bool = False) -> None:
for i in range(1, 4): name = mail.split("@")[0]
card = page.locator(f'#daily-sets > mee-card-group:nth-child(7) > div > mee-card:nth-child({i}) > div') # display = create_display(vnc)
with page.expect_popup() as page1_info: # display.start()
card.click() page, browser = start_browser(name)
page1 = page1_info.value login(page, mail, pwd)
stealth_sync(page1)
page1.wait_for_load_state('load')
try:
selector = page1.get_by_role("button", name="Accepter")
expect(selector).to_be_visible(timeout=10_000)
selector.click()
except AssertionError as e:
pass # The RGPD button should not be there in the first place
play(page1)
page1.close()
page.reload()
info(f'Carte {i} : {not "mee-icon-AddMedium" in card.inner_html()}')
def more_cards(page: Page):
c = 1
while c < 15: # todo I don't really know why it stops without raising any errors, but I guess it's fine
card = page.locator(f"#more-activities > div > mee-card:nth-child({c})")
try:
"mee-icon-AddMedium" in card.inner_html()
except AssertionError as e:
break
if "mee-icon-AddMedium" in card.inner_html():
info(f"Playing more cards {c}.")
with page.expect_popup() as page1_info:
card.click()
page1 = page1_info.value
stealth_sync(page1)
page1.wait_for_load_state('load')
play(page1)
page1.close()
page.reload()
info(f'Carte {c} : {not "mee-icon-AddMedium" in card.inner_html()}')
c += 1
def all_cards(page: Page):
# input("1")
page.goto("https://rewards.bing.com")
page.wait_for_load_state('load')
daily_cards(page)
more_cards(page)
def routine(page: Page) -> None:
all_cards(page) all_cards(page)
pc_search(page) pc_search(page)
# display.stop()
with sync_playwright() as p:
browser = p.firefox.launch_persistent_context("./data/", headless=False)
page = browser.new_page()
stealth_sync(page)
login(page, input("mail ? "), input("password ? "))
all_cards(page)
browser.close() browser.close()
routine("EMAIL", "PWD", True)
""" """
TODO : TODO :
Fidelity management. Fidelity management.
Daily search Daily search mobile
custom start custom start
vnc --vnc-- Should work, but not tested with WSL.
mobile search
""" """

61
V7/actions/cards.py Normal file
View File

@ -0,0 +1,61 @@
from playwright.sync_api import sync_playwright, expect, Page, BrowserContext
from playwright_stealth import stealth_sync
from actions.quiz import play
from tools.logger import *
from tools.logger import *
def all_cards(page: Page):
# input("1")
page.goto("https://rewards.bing.com")
page.wait_for_load_state('load')
daily_cards(page)
more_cards(page)
def daily_cards(page: Page):
for i in range(1, 4):
card = page.locator(f'#daily-sets > mee-card-group:nth-child(7) > div > mee-card:nth-child({i}) > div')
if "mee-icon-AddMedium" not in card.inner_html():
info(f'Daily Card {i} : {"Validated" if "mee-icon-AddMedium" not in card.inner_html() else "Failure"}')
continue
with page.expect_popup() as page1_info:
card.click()
page1 = page1_info.value
stealth_sync(page1)
page1.wait_for_load_state('load')
try:
selector = page1.get_by_role("button", name="Accepter")
expect(selector).to_be_visible(timeout=10_000)
selector.click()
except AssertionError as e:
pass # The RGPD button should not be there in the first place
play(page1)
page1.close()
page.reload()
info(f'Daily Card {i} : {"Validated" if "mee-icon-AddMedium" not in card.inner_html() else "Failure"}')
def more_cards(page: Page):
c = 1
while c < 15: # todo I don't really know why it stops without raising any errors, but I guess it's fine
card = page.locator(f"#more-activities > div > mee-card:nth-child({c})")
try:
expect(card).to_be_visible(timeout=10_000)
except Exception as e: # The card probably does not exist
debug(e)
break
if "mee-icon-AddMedium" in card.inner_html():
info(f"Playing more cards {c}.")
with page.expect_popup() as page1_info:
card.click()
page1 = page1_info.value
stealth_sync(page1)
page1.wait_for_load_state('load')
play(page1)
page1.close()
page.reload()
info(f'More Card {c} : {"Validated" if "mee-icon-AddMedium" not in card.inner_html() else "Failure"}')
c += 1

View File

@ -31,17 +31,19 @@ def detect_quiz_type(page: Page) -> int:
info("Detecting quiz type.") info("Detecting quiz type.")
if "bt_PollRadio" in page.content(): if "bt_PollRadio" in page.content():
return 1 return 1
elif "rqQuestionState" in page.content():
# The quiz is already started
return page.content().count("rqAnswerOption") - 1
else: else:
try: try:
# RGPD # RGPD
selector = page.locator("rqStartQuiz") selector = page.locator("#rqStartQuiz")
expect(selector).to_be_visible(timeout=10_000) expect(selector).to_be_visible(timeout=10_000)
selector.click() selector.click()
return page.content().count("rqAnswerOption") - 1 return page.content().count("rqAnswerOption") - 1
except AssertionError as e: except AssertionError as e:
if "rqQuestionState" in page.content():
# The quiz is already started
warning("Detected via recovery mode.")
return page.content().count("rqAnswerOption") - 1
return 0 return 0
@ -59,13 +61,14 @@ def play_quiz_8(page: Page):
element = page.locator(f'#rqAnswerOption{i - 1}') element = page.locator(f'#rqAnswerOption{i - 1}')
# todo can probably be optimised using filter and has_text # todo can probably be optimised using filter and has_text
if 'iscorrectoption="True"' in element.evaluate("el => el.outerHTML"): if 'iscorrectoption="True"' in element.evaluate("el => el.outerHTML"):
correct_answers.append(f'rqAnswerOption{i - 1}') correct_answers.append(f'#rqAnswerOption{i - 1}')
shuffle(correct_answers) shuffle(correct_answers)
for answer_id in correct_answers: for answer_id in correct_answers:
page.locator(answer_id).click() page.locator(answer_id).click()
page.wait_for_timeout(1000) page.wait_for_timeout(1000)
page.wait_for_timeout(3000) # todo check if there is a better method than hardcoded timout page.wait_for_timeout(3000) # todo check if there is a better method than hardcoded timout
page.wait_for_timeout(3000) # todo check if there is a better method than hardcoded timout
def play_quiz_4(page: Page): def play_quiz_4(page: Page):
@ -106,6 +109,7 @@ def play_quiz_2(page: Page):
page.locator(f"#rqAnswerOption{correct_answer_value}").click() page.locator(f"#rqAnswerOption{correct_answer_value}").click()
page.wait_for_timeout(2000) page.wait_for_timeout(2000)
info("Quiz 2 successful.") info("Quiz 2 successful.")
page.wait_for_timeout(3000) # todo check if there is a better method than hardcoded timout
def play_poll(page: Page): def play_poll(page: Page):
@ -113,3 +117,4 @@ def play_poll(page: Page):
answer_elem = page.locator(f"#btoption{choice([0, 1])}") answer_elem = page.locator(f"#btoption{choice([0, 1])}")
answer_elem.click() answer_elem.click()
info("Poll successful.") info("Poll successful.")
page.wait_for_timeout(3000) # todo check if there is a better method than hardcoded timout

View File

@ -21,3 +21,5 @@ def pc_search(page: Page) -> None:
page.get_by_label("Enter your search here -").click() page.get_by_label("Enter your search here -").click()
page.get_by_label("Enter your search here -").fill(word) page.get_by_label("Enter your search here -").fill(word)
page.get_by_label("Enter your search here -").press("Enter") page.get_by_label("Enter your search here -").press("Enter")
page.wait_for_load_state("load")
page.wait_for_timeout(3000) # todo check if there is a better method than hardcoded timout

View File

@ -1,27 +0,0 @@
from playwright.sync_api import Page
from discord import Colour, Embed, File, RequestsWebhookAdapter, Webhook
class Logger:
def __init__(self, link):
self.wh = Webhook.from_url(link, adapter=RequestsWebhookAdapter())
def error(self, page: Page, error: str):
with open("page.html", "w") as f:
try:
f.write(page.content())
except Exception as e:
f.write(f"the driver has closed or crashed. Can't access page content\n{e}")
embed = Embed(
title="An Error has occurred",
description=str(error),
colour=Colour.red(),
)
try:
page.screenshot(path="screenshot.png")
except Exception as e:
with open("screenshot.png", "w") as f:
f.write(f"Can't take screenshot\n{e}")
file = File("screenshot.png")
embed.set_image(url="attachment://screenshot.png")
self.wh.send(embed=embed, username="error", file=file)

4
V7/requirement.txt Normal file
View File

@ -0,0 +1,4 @@
playwright
pyvirtualdisplay
playwright_stealth
pillow

18
V7/tools/config.py Normal file
View File

@ -0,0 +1,18 @@
from playwright.sync_api import sync_playwright, expect, Page, BrowserContext
from playwright_stealth import stealth_sync
from pyvirtualdisplay.smartdisplay import SmartDisplay
def create_display(vnc=False) -> SmartDisplay:
if vnc:
return SmartDisplay(backend="xvnc", size=(1920, 1080), rfbport=2345, color_depth=24)
return SmartDisplay(size=(1920, 1080))
def start_browser(name: str) -> (Page, BrowserContext):
p = sync_playwright().start()
browser = p.firefox.launch_persistent_context(f"./data/{name}/", headless=False, args=["--start-maximised"],
no_viewport=True)
page = browser.new_page()
stealth_sync(page)
return page, browser

47
V7/tools/logger.py Normal file
View File

@ -0,0 +1,47 @@
import logging
# ANSI escape codes for colors
COLOR_CODES = {
'RESET': '\033[0m',
'BOLD': '\033[1m',
'RED': '\033[31m',
'GREEN': '\033[32m',
'YELLOW': '\033[33m',
'BLUE': '\033[34m',
}
# Define colors for each log level
LOG_COLORS = {
'DEBUG': COLOR_CODES['BLUE'],
'INFO': COLOR_CODES['GREEN'],
'WARNING': COLOR_CODES['YELLOW'],
'ERROR': COLOR_CODES['RED'],
'CRITICAL': COLOR_CODES['BOLD'] + COLOR_CODES['RED'],
}
# Create a formatter with colors
class ColoredFormatter(logging.Formatter):
def format(self, record):
log_level = record.levelname
record.levelname = f"{LOG_COLORS.get(log_level, '')}{record.levelname}{COLOR_CODES['RESET']}"
return super().format(record)
# Set up the root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Create a console handler and set the formatter
ch = logging.StreamHandler()
ch.setFormatter(ColoredFormatter('%(levelname)s: %(message)s'))
# Add the console handler to the root logger
root_logger.addHandler(ch)
# Define log level functions
debug = root_logger.debug
info = root_logger.info
warning = root_logger.warning
error = root_logger.error
critical = root_logger.critical