From 1cbe80ac00de5d86c0f2f66c6def70337e9efdc6 Mon Sep 17 00:00:00 2001 From: Jan Bader Date: Tue, 7 Apr 2026 23:35:38 +0200 Subject: [PATCH] fetch b2 & groups and fetch all years --- downloader.py | 153 ++++++++++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 66 deletions(-) diff --git a/downloader.py b/downloader.py index 3e13f91..14a3742 100644 --- a/downloader.py +++ b/downloader.py @@ -92,8 +92,8 @@ def login(context: BrowserContext, page: Page) -> Page: return page -def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[dict]]: - page = _goto_with_retry(context, page, BILLING_URL) +def get_group_options(context: BrowserContext, page: Page, billing_url: str = BILLING_URL) -> tuple[Page, list[dict]]: + page = _goto_with_retry(context, page, billing_url) group_select = page.locator("select#groupSelection") if group_select.count() == 0: return page, [{"value": "", "label": "default"}] @@ -109,7 +109,7 @@ def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[d def get_year_options(page: Page) -> list[str]: page.wait_for_load_state("domcontentloaded") - year_select = page.locator("select#yearSelection") + year_select = page.locator("select#yearSelection, select#receiptDateId").first if year_select.count() == 0: return [] options = year_select.locator("option").all() @@ -252,10 +252,16 @@ def sanitize_filename(name: str) -> str: return name.strip('_') -def export_invoice_pdf(context: BrowserContext, page: Page, invoice: dict, output_path: Path) -> tuple[Page, Path]: +def export_invoice_pdf( + context: BrowserContext, + page: Page, + invoice: dict, + output_path: Path, + billing_url: str = BILLING_URL, +) -> tuple[Page, Path]: invoice_page = None if invoice.get("open_via_popup") and invoice.get("reference_id"): - page = _goto_with_retry(context, page, BILLING_URL) + page = _goto_with_retry(context, page, billing_url) selector = f"a[data-reference-object-id='{invoice['reference_id']}']" anchor = page.locator(selector).first if anchor.count() == 0: @@ -310,79 +316,94 @@ def download_all_invoices() -> list[Path]: input("Login failed. Inspect the browser, then press Enter to close it...") raise - page = _goto_with_retry(context, page, BILLING_URL) + billing_pages = ["b2", "groups"] - page, groups = get_group_options(context, page) - logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups]) + for billing_page in billing_pages: + billing_url = f"{BILLING_URL}?billing_page={billing_page}" + page = _goto_with_retry(context, page, billing_url) - for group in groups: - group_label = sanitize_filename(group["label"]) - group_dir = output_dir / group_label if len(groups) > 1 else output_dir + page, groups = get_group_options(context, page, billing_url=billing_url) + logger.info( + "Found %d group(s) on billing page '%s': %s", + len(groups), + billing_page, + [g["label"] for g in groups], + ) - if group["value"]: - page = _goto_with_retry(context, page, BILLING_URL) - group_select = page.locator("select#groupSelection") - if group_select.count() > 0: - group_select.select_option(value=group["value"]) - page.wait_for_load_state("domcontentloaded") - time.sleep(1) + for group in groups: + group_label = sanitize_filename(group["label"]) + group_dir = output_dir / group_label if len(groups) > 1 else output_dir - years = get_year_options(page) - if not years: - years = ["all"] - logger.info("Group '%s' - years: %s", group["label"], years) - - for year in years: - year_dir = group_dir / year if year != "all" else group_dir - - if year != "all": - year_select = page.locator("select#yearSelection") - if year_select.count() > 0: - year_select.select_option(value=year) + if group["value"]: + page = _goto_with_retry(context, page, billing_url) + group_select = page.locator("select#groupSelection") + if group_select.count() > 0: + group_select.select_option(value=group["value"]) page.wait_for_load_state("domcontentloaded") time.sleep(1) - invoices = get_invoice_links(page) - logger.info( - "Group '%s', Year '%s' - found %d invoice(s)", - group["label"], year, len(invoices), - ) + years = get_year_options(page) + if not years: + years = ["all"] + logger.info("Group '%s' - years: %s", group["label"], years) - if not invoices: - logger.warning( - "No invoices found for group '%s', year '%s'. Browser will remain open for inspection.", - group["label"], - year, - ) - input("No invoices found. Inspect the browser, then press Enter to continue...") - continue + for year in years: + year_dir = group_dir / year if year != "all" else group_dir - year_dir.mkdir(parents=True, exist_ok=True) - - for idx, invoice in enumerate(invoices): - label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}" - pdf_path = year_dir / f"{label}.pdf" - - if pdf_path.exists(): - logger.info("Skipping (exists): %s", pdf_path) - saved.append(pdf_path) - continue - - try: - page, path = export_invoice_pdf(context, page, invoice, pdf_path) - saved.append(path) - except Exception: - logger.exception("Failed to export: %s", invoice["url"]) - - if year != "all": - page = _goto_with_retry(context, page, BILLING_URL) - if group["value"]: - group_select = page.locator("select#groupSelection") - if group_select.count() > 0: - group_select.select_option(value=group["value"]) + if year != "all": + year_select = page.locator("select#yearSelection, select#receiptDateId").first + if year_select.count() > 0: + year_select.select_option(value=year) page.wait_for_load_state("domcontentloaded") time.sleep(1) + invoices = get_invoice_links(page) + logger.info( + "Group '%s', Year '%s' - found %d invoice(s)", + group["label"], year, len(invoices), + ) + + if not invoices: + logger.warning( + "No invoices found for group '%s', year '%s'. Browser will remain open for inspection.", + group["label"], + year, + ) + input("No invoices found. Inspect the browser, then press Enter to continue...") + continue + + year_dir.mkdir(parents=True, exist_ok=True) + + for idx, invoice in enumerate(invoices): + label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}" + pdf_path = year_dir / f"{label}.pdf" + + if pdf_path.exists(): + logger.info("Skipping (exists): %s", pdf_path) + saved.append(pdf_path) + continue + + try: + page, path = export_invoice_pdf( + context, + page, + invoice, + pdf_path, + billing_url=billing_url, + ) + saved.append(path) + except Exception: + logger.exception("Failed to export: %s", invoice["url"]) + + if year != "all": + page = _goto_with_retry(context, page, billing_url) + if group["value"]: + group_select = page.locator("select#groupSelection") + if group_select.count() > 0: + group_select.select_option(value=group["value"]) + page.wait_for_load_state("domcontentloaded") + time.sleep(1) + context.close() return saved