import logging import re import time from pathlib import Path from playwright.sync_api import sync_playwright, Page, BrowserContext from playwright_stealth import Stealth from config import Config logger = logging.getLogger(__name__) BASE_URL = "https://secure.backblaze.com" BILLING_URL = f"{BASE_URL}/billing_card.htm" CLOUDFLARE_WAIT_MAX = 60 CLOUDFLARE_POLL_INTERVAL = 2 def _wait_for_cloudflare(page: Page) -> None: elapsed = 0 while elapsed < CLOUDFLARE_WAIT_MAX: try: title = page.title() except Exception: return if "Just a moment" not in title: return logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed) time.sleep(CLOUDFLARE_POLL_INTERVAL) elapsed += CLOUDFLARE_POLL_INTERVAL raise RuntimeError( "Cloudflare challenge did not resolve after %ds. " "Try running with --no-headless so you can solve it manually, " "or set BROWSER_HEADLESS=false in .env" % CLOUDFLARE_WAIT_MAX ) def _ensure_page(context: BrowserContext, page: Page) -> Page: try: page.title() return page except Exception: logger.info("Page was closed, creating a new one") new_page = context.new_page() new_page.set_default_timeout(Config.BROWSER_TIMEOUT) return new_page def _goto_with_retry(context: BrowserContext, page: Page, url: str, retries: int = 3) -> Page: for attempt in range(1, retries + 1): page = _ensure_page(context, page) try: page.goto(url, wait_until="domcontentloaded") _wait_for_cloudflare(page) return page except RuntimeError: raise except Exception as e: logger.warning("Navigation attempt %d/%d failed for %s: %s", attempt, retries, url, e) if attempt == retries: raise time.sleep(2 * attempt) return page def login(context: BrowserContext, page: Page) -> Page: logger.info("Logging in to Backblaze...") page = _goto_with_retry(context, page, f"{BASE_URL}/user_signin.htm") if "user_signin" not in page.url: logger.info("Already authenticated; skipping login form.") return page if page.locator("#email-field").count() == 0: logger.info("Login form not present; assuming authenticated session.") return page page.wait_for_selector("#email-field", timeout=30000) page.fill("#email-field", Config.BACKBLAZE_EMAIL) page.click("#submit-button") page.wait_for_selector("#password-field", timeout=30000) page.fill("#password-field", Config.BACKBLAZE_PASSWORD) page.click("#submit-button") page.wait_for_load_state("domcontentloaded") time.sleep(5) code_field = page.locator("#code-field") if code_field.count() > 0: code = input("Enter TOTP code: ").strip() code_field.fill(code) remember = page.locator("#bz-redesign-switch-checkbox") if remember.count() > 0 and not remember.is_checked(): remember.check() page.click("#submit-button") page.wait_for_load_state("domcontentloaded") time.sleep(5) if "user_signin" in page.url: raise RuntimeError("Login failed - check credentials") logger.info("Login successful") return page def get_group_options(context: BrowserContext, page: Page, billing_url: str = BILLING_URL) -> tuple[Page, list[dict]]: page = _goto_with_retry(context, page, billing_url) group_select = page.locator("select#groupSelection") if group_select.count() == 0: return page, [{"value": "", "label": "default"}] options = group_select.locator("option").all() groups = [] for opt in options: val = opt.get_attribute("value") or "" label = opt.inner_text().strip() if val or label: groups.append({"value": val, "label": label}) return page, (groups if groups else [{"value": "", "label": "default"}]) def get_year_options(page: Page) -> list[str]: page.wait_for_load_state("domcontentloaded") year_select = page.locator("select#yearSelection, select#receiptDateId").first if year_select.count() == 0: return [] options = year_select.locator("option").all() return [opt.get_attribute("value") or opt.inner_text().strip() for opt in options] def get_invoice_links(page: Page) -> list[dict]: links = [] seen = set() def _add_link(url: str, label: str, **meta) -> None: if not url: return dedupe_key = meta.get("reference_id") or url if dedupe_key in seen: return seen.add(dedupe_key) entry = {"url": url, "label": label} entry.update(meta) links.append(entry) print_links = page.locator("a.no-print[data-reference-object-id], a[data-reference-object-id]").all() for anchor in print_links: ref_id = anchor.get_attribute("data-reference-object-id") or "" label = anchor.inner_text().strip() or ref_id if ref_id: _add_link( f"{BASE_URL}/billing_print_invoice.htm", label, reference_id=ref_id, open_via_popup=True, ) continue logger.warning("Invoice link missing reference id") return links def fill_invoice_fields(page: Page) -> None: fields = { "company": Config.INVOICE_COMPANY, "notes": Config.INVOICE_NOTES, } for field_id, value in fields.items(): if not value: continue name_override = {"company": "Company", "notes": "Other"}.get(field_id) selectors = [ f"#{field_id}", f"input[name='{field_id}']", f"textarea[name='{field_id}']", f"select[name='{field_id}']", f"input[id*='{field_id}' i]", f"textarea[id*='{field_id}' i]", f"select[id*='{field_id}' i]", f"input[name*='{field_id}' i]", f"textarea[name*='{field_id}' i]", f"select[name*='{field_id}' i]", ] if name_override: selectors.insert(1, f"input[name='{name_override}']") selectors.insert(2, f"textarea[name='{name_override}']") selectors.insert(3, f"select[name='{name_override}']") for selector in selectors: el = page.locator(selector).first if el.count() > 0: input_type = (el.get_attribute("type") or "").lower() if input_type == "hidden": continue if not el.is_visible() or not el.is_enabled(): continue tag = el.evaluate("el => el.tagName.toLowerCase()") if tag == "select": el.select_option(label=value) else: el.fill(value) logger.info("Filled field %s", field_id) break for label_text, value in [ ("VAT", Config.INVOICE_VAT_ID), ("Tax", Config.INVOICE_VAT_ID), ("Document Type", Config.INVOICE_DOCUMENT_TYPE), ("Type", Config.INVOICE_DOCUMENT_TYPE), ("Company", Config.INVOICE_COMPANY), ("Notes", Config.INVOICE_NOTES), ("Note", Config.INVOICE_NOTES), ]: if not value: continue labels = page.locator(f"label:has-text('{label_text}')").all() for label in labels: for_attr = label.get_attribute("for") if for_attr: target = page.locator(f"#{for_attr}") if target.count() > 0: tag = target.evaluate("el => el.tagName.toLowerCase()") if tag == "select": target.select_option(label=value) else: target.fill(value) logger.info("Filled labeled field '%s' -> #%s", label_text, for_attr) break def sanitize_filename(name: str) -> str: name = re.sub(r'[<>:"/\\|?*]', '_', name) name = re.sub(r'\s+', '_', name) return name.strip('_') def export_invoice_pdf( context: BrowserContext, page: Page, invoice: dict, output_path: Path, billing_url: str = BILLING_URL, ) -> tuple[Page, Path]: invoice_page = None if not invoice.get("reference_id"): raise RuntimeError("Invoice reference id is required to open the invoice popup") page = _goto_with_retry(context, page, billing_url) selector = f"a[data-reference-object-id='{invoice['reference_id']}']" anchor = page.locator(selector).first if anchor.count() > 0: anchor.scroll_into_view_if_needed() with page.expect_popup() as popup_info: anchor.click() invoice_page = popup_info.value else: with page.expect_popup() as popup_info: page.evaluate("printPayment", invoice["reference_id"]) invoice_page = popup_info.value invoice_page.wait_for_load_state("domcontentloaded") time.sleep(1) fill_invoice_fields(invoice_page) time.sleep(0.5) invoice_page.pdf(path=str(output_path), format="A4", print_background=True) logger.info("Saved: %s", output_path) if invoice_page is not page: invoice_page.close() return page, output_path def download_all_invoices() -> list[Path]: Config.validate() output_dir = Path(Config.OUTPUT_DIR) output_dir.mkdir(parents=True, exist_ok=True) saved = [] stealth = Stealth() data_dir = Path(Config.BROWSER_DATA_DIR).resolve() data_dir.mkdir(parents=True, exist_ok=True) with stealth.use_sync(sync_playwright()) as p: context = p.chromium.launch_persistent_context( user_data_dir=str(data_dir), headless=Config.BROWSER_HEADLESS, args=["--disable-blink-features=AutomationControlled"], ) page = context.pages[0] if context.pages else context.new_page() page.set_default_timeout(Config.BROWSER_TIMEOUT) try: page = login(context, page) except Exception: logger.exception("Login failed. Browser will remain open for inspection.") input("Login failed. Inspect the browser, then press Enter to close it...") raise billing_pages = ["b2", "groups"] for billing_page in billing_pages: if billing_page == "b2": billing_url = f"{BILLING_URL}" else: billing_url = f"{BILLING_URL}?billing_page={billing_page}" page = _goto_with_retry(context, page, billing_url) page, groups = get_group_options(context, page, billing_url=billing_url) logger.info( "Found %d group(s) on billing page '%s': %s", len(groups), billing_page, [g["label"] for g in groups], ) for group in groups: group_label = sanitize_filename(group["label"]) group_dir = output_dir / group_label if len(groups) > 1 else output_dir if group["value"]: page = _goto_with_retry(context, page, billing_url) group_select = page.locator("select#groupSelection") if group_select.count() > 0: group_select.select_option(value=group["value"]) page.wait_for_load_state("domcontentloaded") time.sleep(1) years = get_year_options(page) if not years: years = ["all"] logger.info("Group '%s' - years: %s", group["label"], years) for year in years: year_dir = group_dir / year if year != "all" else group_dir if year != "all": year_select = page.locator("select#yearSelection, select#receiptDateId").first if year_select.count() > 0: with page.expect_navigation(wait_until="domcontentloaded"): year_select.select_option(value=year) time.sleep(1) invoices = get_invoice_links(page) logger.info( "Group '%s', Year '%s' - found %d invoice(s)", group["label"], year, len(invoices), ) if not invoices: logger.warning( "No invoices found for group '%s', year '%s'. Browser will remain open for inspection.", group["label"], year, ) input("No invoices found. Inspect the browser, then press Enter to continue...") continue year_dir.mkdir(parents=True, exist_ok=True) for idx, invoice in enumerate(invoices): label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}" pdf_path = year_dir / f"{label}.pdf" if pdf_path.exists(): logger.info("Skipping (exists): %s", pdf_path) saved.append(pdf_path) continue try: page, path = export_invoice_pdf( context, page, invoice, pdf_path, billing_url=billing_url, ) saved.append(path) except Exception: logger.exception("Failed to export: %s", invoice["url"]) if year != "all": page = _goto_with_retry(context, page, billing_url) if group["value"]: group_select = page.locator("select#groupSelection") if group_select.count() > 0: group_select.select_option(value=group["value"]) page.wait_for_load_state("domcontentloaded") time.sleep(1) context.close() return saved