fetch b2 & groups and fetch all years

This commit is contained in:
Jan Bader
2026-04-07 23:35:38 +02:00
parent 03bb94db2d
commit 1cbe80ac00

View File

@@ -92,8 +92,8 @@ def login(context: BrowserContext, page: Page) -> Page:
return page
def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[dict]]:
page = _goto_with_retry(context, page, BILLING_URL)
def get_group_options(context: BrowserContext, page: Page, billing_url: str = BILLING_URL) -> tuple[Page, list[dict]]:
page = _goto_with_retry(context, page, billing_url)
group_select = page.locator("select#groupSelection")
if group_select.count() == 0:
return page, [{"value": "", "label": "default"}]
@@ -109,7 +109,7 @@ def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[d
def get_year_options(page: Page) -> list[str]:
page.wait_for_load_state("domcontentloaded")
year_select = page.locator("select#yearSelection")
year_select = page.locator("select#yearSelection, select#receiptDateId").first
if year_select.count() == 0:
return []
options = year_select.locator("option").all()
@@ -252,10 +252,16 @@ def sanitize_filename(name: str) -> str:
return name.strip('_')
def export_invoice_pdf(context: BrowserContext, page: Page, invoice: dict, output_path: Path) -> tuple[Page, Path]:
def export_invoice_pdf(
context: BrowserContext,
page: Page,
invoice: dict,
output_path: Path,
billing_url: str = BILLING_URL,
) -> tuple[Page, Path]:
invoice_page = None
if invoice.get("open_via_popup") and invoice.get("reference_id"):
page = _goto_with_retry(context, page, BILLING_URL)
page = _goto_with_retry(context, page, billing_url)
selector = f"a[data-reference-object-id='{invoice['reference_id']}']"
anchor = page.locator(selector).first
if anchor.count() == 0:
@@ -310,79 +316,94 @@ def download_all_invoices() -> list[Path]:
input("Login failed. Inspect the browser, then press Enter to close it...")
raise
page = _goto_with_retry(context, page, BILLING_URL)
billing_pages = ["b2", "groups"]
page, groups = get_group_options(context, page)
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
for billing_page in billing_pages:
billing_url = f"{BILLING_URL}?billing_page={billing_page}"
page = _goto_with_retry(context, page, billing_url)
for group in groups:
group_label = sanitize_filename(group["label"])
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
page, groups = get_group_options(context, page, billing_url=billing_url)
logger.info(
"Found %d group(s) on billing page '%s': %s",
len(groups),
billing_page,
[g["label"] for g in groups],
)
if group["value"]:
page = _goto_with_retry(context, page, BILLING_URL)
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
for group in groups:
group_label = sanitize_filename(group["label"])
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
years = get_year_options(page)
if not years:
years = ["all"]
logger.info("Group '%s' - years: %s", group["label"], years)
for year in years:
year_dir = group_dir / year if year != "all" else group_dir
if year != "all":
year_select = page.locator("select#yearSelection")
if year_select.count() > 0:
year_select.select_option(value=year)
if group["value"]:
page = _goto_with_retry(context, page, billing_url)
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
invoices = get_invoice_links(page)
logger.info(
"Group '%s', Year '%s' - found %d invoice(s)",
group["label"], year, len(invoices),
)
years = get_year_options(page)
if not years:
years = ["all"]
logger.info("Group '%s' - years: %s", group["label"], years)
if not invoices:
logger.warning(
"No invoices found for group '%s', year '%s'. Browser will remain open for inspection.",
group["label"],
year,
)
input("No invoices found. Inspect the browser, then press Enter to continue...")
continue
for year in years:
year_dir = group_dir / year if year != "all" else group_dir
year_dir.mkdir(parents=True, exist_ok=True)
for idx, invoice in enumerate(invoices):
label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}"
pdf_path = year_dir / f"{label}.pdf"
if pdf_path.exists():
logger.info("Skipping (exists): %s", pdf_path)
saved.append(pdf_path)
continue
try:
page, path = export_invoice_pdf(context, page, invoice, pdf_path)
saved.append(path)
except Exception:
logger.exception("Failed to export: %s", invoice["url"])
if year != "all":
page = _goto_with_retry(context, page, BILLING_URL)
if group["value"]:
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
if year != "all":
year_select = page.locator("select#yearSelection, select#receiptDateId").first
if year_select.count() > 0:
year_select.select_option(value=year)
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
invoices = get_invoice_links(page)
logger.info(
"Group '%s', Year '%s' - found %d invoice(s)",
group["label"], year, len(invoices),
)
if not invoices:
logger.warning(
"No invoices found for group '%s', year '%s'. Browser will remain open for inspection.",
group["label"],
year,
)
input("No invoices found. Inspect the browser, then press Enter to continue...")
continue
year_dir.mkdir(parents=True, exist_ok=True)
for idx, invoice in enumerate(invoices):
label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}"
pdf_path = year_dir / f"{label}.pdf"
if pdf_path.exists():
logger.info("Skipping (exists): %s", pdf_path)
saved.append(pdf_path)
continue
try:
page, path = export_invoice_pdf(
context,
page,
invoice,
pdf_path,
billing_url=billing_url,
)
saved.append(path)
except Exception:
logger.exception("Failed to export: %s", invoice["url"])
if year != "all":
page = _goto_with_retry(context, page, billing_url)
if group["value"]:
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
context.close()
return saved