use persistent store and implement login

This commit is contained in:
Jan Bader
2026-04-05 22:48:08 +02:00
parent d01776e2ab
commit 894b7cee54
3 changed files with 66 additions and 31 deletions

1
.gitignore vendored
View File

@@ -50,6 +50,7 @@ htmlcov/
# Playwright # Playwright
.playwright/ .playwright/
.browser_data/
.direnv/ .direnv/
# Output # Output

View File

@@ -15,6 +15,7 @@ class Config:
INVOICE_NOTES = os.getenv("INVOICE_NOTES", "") INVOICE_NOTES = os.getenv("INVOICE_NOTES", "")
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices") OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices")
BROWSER_DATA_DIR = os.getenv("BROWSER_DATA_DIR", "./.browser_data")
BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true" BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true"
BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "60000")) BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "60000"))
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")

View File

@@ -3,7 +3,7 @@ import re
import time import time
from pathlib import Path from pathlib import Path
from playwright.sync_api import sync_playwright, Page, Browser from playwright.sync_api import sync_playwright, Page, BrowserContext
from playwright_stealth import Stealth from playwright_stealth import Stealth
from config import Config from config import Config
@@ -11,16 +11,18 @@ from config import Config
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
BASE_URL = "https://secure.backblaze.com" BASE_URL = "https://secure.backblaze.com"
BILLING_URL = f"{BASE_URL}/billing.htm" BILLING_URL = f"{BASE_URL}/billing_card.htm"
CLOUDFLARE_WAIT_MAX = 60 CLOUDFLARE_WAIT_MAX = 60
CLOUDFLARE_POLL_INTERVAL = 2 CLOUDFLARE_POLL_INTERVAL = 2
def _wait_for_cloudflare(page: Page) -> None: def _wait_for_cloudflare(page: Page) -> None:
elapsed = 0 elapsed = 0
while elapsed < CLOUDFLARE_WAIT_MAX: while elapsed < CLOUDFLARE_WAIT_MAX:
title = page.title() try:
title = page.title()
except Exception:
return
if "Just a moment" not in title: if "Just a moment" not in title:
return return
logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed) logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed)
@@ -33,12 +35,24 @@ def _wait_for_cloudflare(page: Page) -> None:
) )
def _goto_with_retry(page: Page, url: str, retries: int = 3) -> None: def _ensure_page(context: BrowserContext, page: Page) -> Page:
try:
page.title()
return page
except Exception:
logger.info("Page was closed, creating a new one")
new_page = context.new_page()
new_page.set_default_timeout(Config.BROWSER_TIMEOUT)
return new_page
def _goto_with_retry(context: BrowserContext, page: Page, url: str, retries: int = 3) -> Page:
for attempt in range(1, retries + 1): for attempt in range(1, retries + 1):
page = _ensure_page(context, page)
try: try:
page.goto(url, wait_until="domcontentloaded") page.goto(url, wait_until="domcontentloaded")
_wait_for_cloudflare(page) _wait_for_cloudflare(page)
return return page
except RuntimeError: except RuntimeError:
raise raise
except Exception as e: except Exception as e:
@@ -46,27 +60,43 @@ def _goto_with_retry(page: Page, url: str, retries: int = 3) -> None:
if attempt == retries: if attempt == retries:
raise raise
time.sleep(2 * attempt) time.sleep(2 * attempt)
return page
def login(page: Page) -> None: def login(context: BrowserContext, page: Page) -> Page:
logger.info("Logging in to Backblaze...") logger.info("Logging in to Backblaze...")
_goto_with_retry(page, f"{BASE_URL}/user_signin.htm") page = _goto_with_retry(context, page, f"{BASE_URL}/user_signin.htm")
page.wait_for_selector("#email", timeout=30000) page.wait_for_selector("#email-field", timeout=30000)
page.fill("#email", Config.BACKBLAZE_EMAIL) page.fill("#email-field", Config.BACKBLAZE_EMAIL)
page.fill("#password", Config.BACKBLAZE_PASSWORD) page.click("#submit-button")
page.click("#submitButton") page.wait_for_selector("#password-field", timeout=30000)
page.fill("#password-field", Config.BACKBLAZE_PASSWORD)
page.click("#submit-button")
page.wait_for_load_state("domcontentloaded") page.wait_for_load_state("domcontentloaded")
time.sleep(2) time.sleep(2)
code_field = page.locator("#code-field")
if code_field.count() > 0:
code = input("Enter TOTP code: ").strip()
code_field.fill(code)
remember = page.locator("#bz-redesign-switch-checkbox")
if remember.count() > 0 and not remember.is_checked():
remember.check()
page.click("#submit-button")
page.wait_for_load_state("domcontentloaded")
time.sleep(2)
if "user_signin" in page.url: if "user_signin" in page.url:
raise RuntimeError("Login failed - check credentials") raise RuntimeError("Login failed - check credentials")
logger.info("Login successful") logger.info("Login successful")
return page
def get_group_options(page: Page) -> list[dict]: def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[dict]]:
_goto_with_retry(page, BILLING_URL) page = _goto_with_retry(context, page, BILLING_URL)
group_select = page.locator("select#groupSelection") group_select = page.locator("select#groupSelection")
if group_select.count() == 0: if group_select.count() == 0:
return [{"value": "", "label": "default"}] return page, [{"value": "", "label": "default"}]
options = group_select.locator("option").all() options = group_select.locator("option").all()
groups = [] groups = []
for opt in options: for opt in options:
@@ -74,7 +104,7 @@ def get_group_options(page: Page) -> list[dict]:
label = opt.inner_text().strip() label = opt.inner_text().strip()
if val or label: if val or label:
groups.append({"value": val, "label": label}) groups.append({"value": val, "label": label})
return groups if groups else [{"value": "", "label": "default"}] return page, (groups if groups else [{"value": "", "label": "default"}])
def get_year_options(page: Page) -> list[str]: def get_year_options(page: Page) -> list[str]:
@@ -177,8 +207,8 @@ def sanitize_filename(name: str) -> str:
return name.strip('_') return name.strip('_')
def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path: def export_invoice_pdf(context: BrowserContext, page: Page, invoice_url: str, output_path: Path) -> tuple[Page, Path]:
_goto_with_retry(page, invoice_url) page = _goto_with_retry(context, page, invoice_url)
time.sleep(1) time.sleep(1)
fill_invoice_fields(page) fill_invoice_fields(page)
@@ -186,7 +216,7 @@ def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path:
page.pdf(path=str(output_path), format="A4", print_background=True) page.pdf(path=str(output_path), format="A4", print_background=True)
logger.info("Saved: %s", output_path) logger.info("Saved: %s", output_path)
return output_path return page, output_path
def download_all_invoices() -> list[Path]: def download_all_invoices() -> list[Path]:
@@ -197,21 +227,23 @@ def download_all_invoices() -> list[Path]:
stealth = Stealth() stealth = Stealth()
with sync_playwright() as p: data_dir = Path(Config.BROWSER_DATA_DIR).resolve()
browser = p.chromium.launch( data_dir.mkdir(parents=True, exist_ok=True)
with stealth.use_sync(sync_playwright()) as p:
context = p.chromium.launch_persistent_context(
user_data_dir=str(data_dir),
headless=Config.BROWSER_HEADLESS, headless=Config.BROWSER_HEADLESS,
args=["--disable-blink-features=AutomationControlled"], args=["--disable-blink-features=AutomationControlled"],
) )
context = browser.new_context() page = context.pages[0] if context.pages else context.new_page()
stealth.apply_stealth_sync(context)
page = context.new_page()
page.set_default_timeout(Config.BROWSER_TIMEOUT) page.set_default_timeout(Config.BROWSER_TIMEOUT)
login(page) page = login(context, page)
_goto_with_retry(page, BILLING_URL) page = _goto_with_retry(context, page, BILLING_URL)
groups = get_group_options(page) page, groups = get_group_options(context, page)
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups]) logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
for group in groups: for group in groups:
@@ -219,7 +251,7 @@ def download_all_invoices() -> list[Path]:
group_dir = output_dir / group_label if len(groups) > 1 else output_dir group_dir = output_dir / group_label if len(groups) > 1 else output_dir
if group["value"]: if group["value"]:
_goto_with_retry(page, BILLING_URL) page = _goto_with_retry(context, page, BILLING_URL)
group_select = page.locator("select#groupSelection") group_select = page.locator("select#groupSelection")
if group_select.count() > 0: if group_select.count() > 0:
group_select.select_option(value=group["value"]) group_select.select_option(value=group["value"])
@@ -262,12 +294,13 @@ def download_all_invoices() -> list[Path]:
continue continue
try: try:
saved.append(export_invoice_pdf(page, invoice["url"], pdf_path)) page, path = export_invoice_pdf(context, page, invoice["url"], pdf_path)
saved.append(path)
except Exception: except Exception:
logger.exception("Failed to export: %s", invoice["url"]) logger.exception("Failed to export: %s", invoice["url"])
if year != "all": if year != "all":
_goto_with_retry(page, BILLING_URL) page = _goto_with_retry(context, page, BILLING_URL)
if group["value"]: if group["value"]:
group_select = page.locator("select#groupSelection") group_select = page.locator("select#groupSelection")
if group_select.count() > 0: if group_select.count() > 0:
@@ -275,6 +308,6 @@ def download_all_invoices() -> list[Path]:
page.wait_for_load_state("domcontentloaded") page.wait_for_load_state("domcontentloaded")
time.sleep(1) time.sleep(1)
browser.close() context.close()
return saved return saved