From 894b7cee54068933f6167acb34ab0863a8c42774 Mon Sep 17 00:00:00 2001 From: Jan Bader Date: Sun, 5 Apr 2026 22:48:08 +0200 Subject: [PATCH] use persistent store and implement login --- .gitignore | 1 + config.py | 1 + downloader.py | 95 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index ffd3944..1e38177 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ htmlcov/ # Playwright .playwright/ +.browser_data/ .direnv/ # Output diff --git a/config.py b/config.py index d5a07af..d0d452c 100644 --- a/config.py +++ b/config.py @@ -15,6 +15,7 @@ class Config: INVOICE_NOTES = os.getenv("INVOICE_NOTES", "") OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices") + BROWSER_DATA_DIR = os.getenv("BROWSER_DATA_DIR", "./.browser_data") BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true" BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "60000")) LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") diff --git a/downloader.py b/downloader.py index 5866c47..26c730c 100644 --- a/downloader.py +++ b/downloader.py @@ -3,7 +3,7 @@ import re import time from pathlib import Path -from playwright.sync_api import sync_playwright, Page, Browser +from playwright.sync_api import sync_playwright, Page, BrowserContext from playwright_stealth import Stealth from config import Config @@ -11,16 +11,18 @@ from config import Config logger = logging.getLogger(__name__) BASE_URL = "https://secure.backblaze.com" -BILLING_URL = f"{BASE_URL}/billing.htm" +BILLING_URL = f"{BASE_URL}/billing_card.htm" CLOUDFLARE_WAIT_MAX = 60 CLOUDFLARE_POLL_INTERVAL = 2 - def _wait_for_cloudflare(page: Page) -> None: elapsed = 0 while elapsed < CLOUDFLARE_WAIT_MAX: - title = page.title() + try: + title = page.title() + except Exception: + return if "Just a moment" not in title: return logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed) @@ -33,12 +35,24 @@ def _wait_for_cloudflare(page: Page) -> None: ) -def _goto_with_retry(page: Page, url: str, retries: int = 3) -> None: +def _ensure_page(context: BrowserContext, page: Page) -> Page: + try: + page.title() + return page + except Exception: + logger.info("Page was closed, creating a new one") + new_page = context.new_page() + new_page.set_default_timeout(Config.BROWSER_TIMEOUT) + return new_page + + +def _goto_with_retry(context: BrowserContext, page: Page, url: str, retries: int = 3) -> Page: for attempt in range(1, retries + 1): + page = _ensure_page(context, page) try: page.goto(url, wait_until="domcontentloaded") _wait_for_cloudflare(page) - return + return page except RuntimeError: raise except Exception as e: @@ -46,27 +60,43 @@ def _goto_with_retry(page: Page, url: str, retries: int = 3) -> None: if attempt == retries: raise time.sleep(2 * attempt) + return page -def login(page: Page) -> None: +def login(context: BrowserContext, page: Page) -> Page: logger.info("Logging in to Backblaze...") - _goto_with_retry(page, f"{BASE_URL}/user_signin.htm") - page.wait_for_selector("#email", timeout=30000) - page.fill("#email", Config.BACKBLAZE_EMAIL) - page.fill("#password", Config.BACKBLAZE_PASSWORD) - page.click("#submitButton") + page = _goto_with_retry(context, page, f"{BASE_URL}/user_signin.htm") + page.wait_for_selector("#email-field", timeout=30000) + page.fill("#email-field", Config.BACKBLAZE_EMAIL) + page.click("#submit-button") + page.wait_for_selector("#password-field", timeout=30000) + page.fill("#password-field", Config.BACKBLAZE_PASSWORD) + page.click("#submit-button") page.wait_for_load_state("domcontentloaded") time.sleep(2) + + code_field = page.locator("#code-field") + if code_field.count() > 0: + code = input("Enter TOTP code: ").strip() + code_field.fill(code) + remember = page.locator("#bz-redesign-switch-checkbox") + if remember.count() > 0 and not remember.is_checked(): + remember.check() + page.click("#submit-button") + page.wait_for_load_state("domcontentloaded") + time.sleep(2) + if "user_signin" in page.url: raise RuntimeError("Login failed - check credentials") logger.info("Login successful") + return page -def get_group_options(page: Page) -> list[dict]: - _goto_with_retry(page, BILLING_URL) +def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[dict]]: + page = _goto_with_retry(context, page, BILLING_URL) group_select = page.locator("select#groupSelection") if group_select.count() == 0: - return [{"value": "", "label": "default"}] + return page, [{"value": "", "label": "default"}] options = group_select.locator("option").all() groups = [] for opt in options: @@ -74,7 +104,7 @@ def get_group_options(page: Page) -> list[dict]: label = opt.inner_text().strip() if val or label: groups.append({"value": val, "label": label}) - return groups if groups else [{"value": "", "label": "default"}] + return page, (groups if groups else [{"value": "", "label": "default"}]) def get_year_options(page: Page) -> list[str]: @@ -177,8 +207,8 @@ def sanitize_filename(name: str) -> str: return name.strip('_') -def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path: - _goto_with_retry(page, invoice_url) +def export_invoice_pdf(context: BrowserContext, page: Page, invoice_url: str, output_path: Path) -> tuple[Page, Path]: + page = _goto_with_retry(context, page, invoice_url) time.sleep(1) fill_invoice_fields(page) @@ -186,7 +216,7 @@ def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path: page.pdf(path=str(output_path), format="A4", print_background=True) logger.info("Saved: %s", output_path) - return output_path + return page, output_path def download_all_invoices() -> list[Path]: @@ -197,21 +227,23 @@ def download_all_invoices() -> list[Path]: stealth = Stealth() - with sync_playwright() as p: - browser = p.chromium.launch( + data_dir = Path(Config.BROWSER_DATA_DIR).resolve() + data_dir.mkdir(parents=True, exist_ok=True) + + with stealth.use_sync(sync_playwright()) as p: + context = p.chromium.launch_persistent_context( + user_data_dir=str(data_dir), headless=Config.BROWSER_HEADLESS, args=["--disable-blink-features=AutomationControlled"], ) - context = browser.new_context() - stealth.apply_stealth_sync(context) - page = context.new_page() + page = context.pages[0] if context.pages else context.new_page() page.set_default_timeout(Config.BROWSER_TIMEOUT) - login(page) + page = login(context, page) - _goto_with_retry(page, BILLING_URL) + page = _goto_with_retry(context, page, BILLING_URL) - groups = get_group_options(page) + page, groups = get_group_options(context, page) logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups]) for group in groups: @@ -219,7 +251,7 @@ def download_all_invoices() -> list[Path]: group_dir = output_dir / group_label if len(groups) > 1 else output_dir if group["value"]: - _goto_with_retry(page, BILLING_URL) + page = _goto_with_retry(context, page, BILLING_URL) group_select = page.locator("select#groupSelection") if group_select.count() > 0: group_select.select_option(value=group["value"]) @@ -262,12 +294,13 @@ def download_all_invoices() -> list[Path]: continue try: - saved.append(export_invoice_pdf(page, invoice["url"], pdf_path)) + page, path = export_invoice_pdf(context, page, invoice["url"], pdf_path) + saved.append(path) except Exception: logger.exception("Failed to export: %s", invoice["url"]) if year != "all": - _goto_with_retry(page, BILLING_URL) + page = _goto_with_retry(context, page, BILLING_URL) if group["value"]: group_select = page.locator("select#groupSelection") if group_select.count() > 0: @@ -275,6 +308,6 @@ def download_all_invoices() -> list[Path]: page.wait_for_load_state("domcontentloaded") time.sleep(1) - browser.close() + context.close() return saved