Files
backblaze-invoices-downloader/downloader.py
2026-04-05 22:22:49 +02:00

281 lines
9.8 KiB
Python

import logging
import re
import time
from pathlib import Path
from playwright.sync_api import sync_playwright, Page, Browser
from playwright_stealth import Stealth
from config import Config
logger = logging.getLogger(__name__)
BASE_URL = "https://secure.backblaze.com"
BILLING_URL = f"{BASE_URL}/billing.htm"
CLOUDFLARE_WAIT_MAX = 60
CLOUDFLARE_POLL_INTERVAL = 2
def _wait_for_cloudflare(page: Page) -> None:
elapsed = 0
while elapsed < CLOUDFLARE_WAIT_MAX:
title = page.title()
if "Just a moment" not in title:
return
logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed)
time.sleep(CLOUDFLARE_POLL_INTERVAL)
elapsed += CLOUDFLARE_POLL_INTERVAL
raise RuntimeError(
"Cloudflare challenge did not resolve after %ds. "
"Try running with --no-headless so you can solve it manually, "
"or set BROWSER_HEADLESS=false in .env" % CLOUDFLARE_WAIT_MAX
)
def _goto_with_retry(page: Page, url: str, retries: int = 3) -> None:
for attempt in range(1, retries + 1):
try:
page.goto(url, wait_until="domcontentloaded")
_wait_for_cloudflare(page)
return
except RuntimeError:
raise
except Exception as e:
logger.warning("Navigation attempt %d/%d failed for %s: %s", attempt, retries, url, e)
if attempt == retries:
raise
time.sleep(2 * attempt)
def login(page: Page) -> None:
logger.info("Logging in to Backblaze...")
_goto_with_retry(page, f"{BASE_URL}/user_signin.htm")
page.wait_for_selector("#email", timeout=30000)
page.fill("#email", Config.BACKBLAZE_EMAIL)
page.fill("#password", Config.BACKBLAZE_PASSWORD)
page.click("#submitButton")
page.wait_for_load_state("domcontentloaded")
time.sleep(2)
if "user_signin" in page.url:
raise RuntimeError("Login failed - check credentials")
logger.info("Login successful")
def get_group_options(page: Page) -> list[dict]:
_goto_with_retry(page, BILLING_URL)
group_select = page.locator("select#groupSelection")
if group_select.count() == 0:
return [{"value": "", "label": "default"}]
options = group_select.locator("option").all()
groups = []
for opt in options:
val = opt.get_attribute("value") or ""
label = opt.inner_text().strip()
if val or label:
groups.append({"value": val, "label": label})
return groups if groups else [{"value": "", "label": "default"}]
def get_year_options(page: Page) -> list[str]:
page.wait_for_load_state("domcontentloaded")
year_select = page.locator("select#yearSelection")
if year_select.count() == 0:
return []
options = year_select.locator("option").all()
return [opt.get_attribute("value") or opt.inner_text().strip() for opt in options]
def get_invoice_links(page: Page) -> list[dict]:
links = []
rows = page.locator("table.billing-table tbody tr, table#billingTable tbody tr, table tbody tr").all()
for row in rows:
anchors = row.locator("a[href*='billing_invoice'], a[href*='invoice']").all()
for anchor in anchors:
href = anchor.get_attribute("href") or ""
text = anchor.inner_text().strip()
if href:
if not href.startswith("http"):
href = f"{BASE_URL}/{href.lstrip('/')}"
links.append({"url": href, "label": text})
if not links:
all_anchors = page.locator("a[href*='invoice']").all()
for anchor in all_anchors:
href = anchor.get_attribute("href") or ""
text = anchor.inner_text().strip()
if href and "invoice" in href.lower():
if not href.startswith("http"):
href = f"{BASE_URL}/{href.lstrip('/')}"
links.append({"url": href, "label": text})
return links
def fill_invoice_fields(page: Page) -> None:
fields = {
"vatId": Config.INVOICE_VAT_ID,
"documentType": Config.INVOICE_DOCUMENT_TYPE,
"company": Config.INVOICE_COMPANY,
"notes": Config.INVOICE_NOTES,
}
for field_id, value in fields.items():
if not value:
continue
for selector in [
f"#{field_id}",
f"input[name='{field_id}']",
f"textarea[name='{field_id}']",
f"select[name='{field_id}']",
f"input[id*='{field_id}' i]",
f"textarea[id*='{field_id}' i]",
f"select[id*='{field_id}' i]",
f"input[name*='{field_id}' i]",
f"textarea[name*='{field_id}' i]",
f"select[name*='{field_id}' i]",
]:
el = page.locator(selector).first
if el.count() > 0:
tag = el.evaluate("el => el.tagName.toLowerCase()")
if tag == "select":
el.select_option(label=value)
else:
el.fill(value)
logger.info("Filled field %s", field_id)
break
for label_text, value in [
("VAT", Config.INVOICE_VAT_ID),
("Tax", Config.INVOICE_VAT_ID),
("Document Type", Config.INVOICE_DOCUMENT_TYPE),
("Type", Config.INVOICE_DOCUMENT_TYPE),
("Company", Config.INVOICE_COMPANY),
("Notes", Config.INVOICE_NOTES),
("Note", Config.INVOICE_NOTES),
]:
if not value:
continue
labels = page.locator(f"label:has-text('{label_text}')").all()
for label in labels:
for_attr = label.get_attribute("for")
if for_attr:
target = page.locator(f"#{for_attr}")
if target.count() > 0:
tag = target.evaluate("el => el.tagName.toLowerCase()")
if tag == "select":
target.select_option(label=value)
else:
target.fill(value)
logger.info("Filled labeled field '%s' -> #%s", label_text, for_attr)
break
def sanitize_filename(name: str) -> str:
name = re.sub(r'[<>:"/\\|?*]', '_', name)
name = re.sub(r'\s+', '_', name)
return name.strip('_')
def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path:
_goto_with_retry(page, invoice_url)
time.sleep(1)
fill_invoice_fields(page)
time.sleep(0.5)
page.pdf(path=str(output_path), format="A4", print_background=True)
logger.info("Saved: %s", output_path)
return output_path
def download_all_invoices() -> list[Path]:
Config.validate()
output_dir = Path(Config.OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
saved = []
stealth = Stealth()
with sync_playwright() as p:
browser = p.chromium.launch(
headless=Config.BROWSER_HEADLESS,
args=["--disable-blink-features=AutomationControlled"],
)
context = browser.new_context()
stealth.apply_stealth_sync(context)
page = context.new_page()
page.set_default_timeout(Config.BROWSER_TIMEOUT)
login(page)
_goto_with_retry(page, BILLING_URL)
groups = get_group_options(page)
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
for group in groups:
group_label = sanitize_filename(group["label"])
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
if group["value"]:
_goto_with_retry(page, BILLING_URL)
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
years = get_year_options(page)
if not years:
years = ["all"]
logger.info("Group '%s' - years: %s", group["label"], years)
for year in years:
year_dir = group_dir / year if year != "all" else group_dir
if year != "all":
year_select = page.locator("select#yearSelection")
if year_select.count() > 0:
year_select.select_option(value=year)
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
invoices = get_invoice_links(page)
logger.info(
"Group '%s', Year '%s' - found %d invoice(s)",
group["label"], year, len(invoices),
)
if not invoices:
continue
year_dir.mkdir(parents=True, exist_ok=True)
for idx, invoice in enumerate(invoices):
label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}"
pdf_path = year_dir / f"{label}.pdf"
if pdf_path.exists():
logger.info("Skipping (exists): %s", pdf_path)
saved.append(pdf_path)
continue
try:
saved.append(export_invoice_pdf(page, invoice["url"], pdf_path))
except Exception:
logger.exception("Failed to export: %s", invoice["url"])
if year != "all":
_goto_with_retry(page, BILLING_URL)
if group["value"]:
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
browser.close()
return saved