import html
import logging
import re
import time
from pathlib import Path
from playwright.sync_api import sync_playwright, Page, BrowserContext
from playwright_stealth import Stealth
from config import Config
logger = logging.getLogger(__name__)
BASE_URL = "https://secure.backblaze.com"
BILLING_URL = f"{BASE_URL}/billing_card.htm"
CLOUDFLARE_WAIT_MAX = 60
CLOUDFLARE_POLL_INTERVAL = 2
def _wait_for_cloudflare(page: Page) -> None:
elapsed = 0
while elapsed < CLOUDFLARE_WAIT_MAX:
try:
title = page.title()
except Exception:
return
if "Just a moment" not in title:
return
logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed)
time.sleep(CLOUDFLARE_POLL_INTERVAL)
elapsed += CLOUDFLARE_POLL_INTERVAL
raise RuntimeError(
"Cloudflare challenge did not resolve after %ds. "
"Try running with --no-headless so you can solve it manually, "
"or set BROWSER_HEADLESS=false in .env" % CLOUDFLARE_WAIT_MAX
)
def _ensure_page(context: BrowserContext, page: Page) -> Page:
try:
page.title()
return page
except Exception:
logger.info("Page was closed, creating a new one")
new_page = context.new_page()
new_page.set_default_timeout(Config.BROWSER_TIMEOUT)
return new_page
def _goto_with_retry(context: BrowserContext, page: Page, url: str, retries: int = 3) -> Page:
for attempt in range(1, retries + 1):
page = _ensure_page(context, page)
try:
page.goto(url, wait_until="domcontentloaded")
_wait_for_cloudflare(page)
return page
except RuntimeError:
raise
except Exception as e:
logger.warning("Navigation attempt %d/%d failed for %s: %s", attempt, retries, url, e)
if attempt == retries:
raise
time.sleep(2 * attempt)
return page
def login(context: BrowserContext, page: Page) -> Page:
logger.info("Logging in to Backblaze...")
page = _goto_with_retry(context, page, f"{BASE_URL}/user_signin.htm")
if "user_signin" not in page.url:
logger.info("Already authenticated; skipping login form.")
return page
if page.locator("#email-field").count() == 0:
logger.info("Login form not present; assuming authenticated session.")
return page
page.wait_for_selector("#email-field", timeout=30000)
page.fill("#email-field", Config.BACKBLAZE_EMAIL)
page.click("#submit-button")
page.wait_for_selector("#password-field", timeout=30000)
page.fill("#password-field", Config.BACKBLAZE_PASSWORD)
page.click("#submit-button")
page.wait_for_load_state("domcontentloaded")
time.sleep(5)
code_field = page.locator("#code-field")
if code_field.count() > 0:
code = input("Enter TOTP code: ").strip()
code_field.fill(code)
remember = page.locator("#bz-redesign-switch-checkbox")
if remember.count() > 0 and not remember.is_checked():
remember.check()
page.click("#submit-button")
page.wait_for_load_state("domcontentloaded")
time.sleep(5)
if "user_signin" in page.url:
raise RuntimeError("Login failed - check credentials")
logger.info("Login successful")
return page
def get_group_options(context: BrowserContext, page: Page, billing_url: str = BILLING_URL) -> tuple[Page, list[dict]]:
page = _goto_with_retry(context, page, billing_url)
group_select = page.locator("select#groupSelection")
if group_select.count() == 0:
return page, [{"value": "", "label": "default"}]
options = group_select.locator("option").all()
groups = []
for opt in options:
val = opt.get_attribute("value") or ""
label = opt.inner_text().strip()
if val or label:
groups.append({"value": val, "label": label})
return page, (groups if groups else [{"value": "", "label": "default"}])
def get_year_options(page: Page) -> list[str]:
page.wait_for_load_state("domcontentloaded")
year_select = page.locator("select#yearSelection, select#receiptDateId").first
if year_select.count() == 0:
return []
options = year_select.locator("option").all()
return [opt.get_attribute("value") or opt.inner_text().strip() for opt in options]
def get_invoice_links(page: Page) -> list[dict]:
links = []
seen = set()
def _add_link(url: str, label: str, **meta) -> None:
if not url:
return
dedupe_key = meta.get("reference_id") or url
if dedupe_key in seen:
return
seen.add(dedupe_key)
entry = {"url": url, "label": label}
entry.update(meta)
links.append(entry)
print_links = page.locator("a.no-print[data-reference-object-id], a[data-reference-object-id]").all()
for anchor in print_links:
ref_id = anchor.get_attribute("data-reference-object-id") or ""
label = anchor.inner_text().strip() or ref_id
if ref_id:
_add_link(
f"{BASE_URL}/billing_print_invoice.htm",
label,
reference_id=ref_id,
open_via_popup=True,
)
continue
logger.warning("Invoice link missing reference id")
return links
def fill_invoice_fields(page: Page) -> None:
fields = {
"company": Config.INVOICE_COMPANY,
"notes": Config.INVOICE_NOTES,
}
for field_id, value in fields.items():
if not value:
continue
name_override = {"company": "Company", "notes": "Other"}.get(field_id)
selectors = [
f"#{field_id}",
f"input[name='{field_id}']",
f"textarea[name='{field_id}']",
f"select[name='{field_id}']",
f"input[id*='{field_id}' i]",
f"textarea[id*='{field_id}' i]",
f"select[id*='{field_id}' i]",
f"input[name*='{field_id}' i]",
f"textarea[name*='{field_id}' i]",
f"select[name*='{field_id}' i]",
]
if name_override:
selectors.insert(1, f"input[name='{name_override}']")
selectors.insert(2, f"textarea[name='{name_override}']")
selectors.insert(3, f"select[name='{name_override}']")
for selector in selectors:
el = page.locator(selector).first
if el.count() > 0:
input_type = (el.get_attribute("type") or "").lower()
if input_type == "hidden":
continue
if not el.is_visible() or not el.is_enabled():
continue
tag = el.evaluate("el => el.tagName.toLowerCase()")
if tag == "select":
el.select_option(label=value)
else:
el.fill(value)
logger.info("Filled field %s", field_id)
break
for label_text, value in [
("VAT", Config.INVOICE_VAT_ID),
("Tax", Config.INVOICE_VAT_ID),
("Document Type", Config.INVOICE_DOCUMENT_TYPE),
("Type", Config.INVOICE_DOCUMENT_TYPE),
("Company", Config.INVOICE_COMPANY),
("Notes", Config.INVOICE_NOTES),
("Note", Config.INVOICE_NOTES),
]:
if not value:
continue
labels = page.locator(f"label:has-text('{label_text}')").all()
for label in labels:
for_attr = label.get_attribute("for")
if for_attr:
target = page.locator(f"#{for_attr}")
if target.count() > 0:
tag = target.evaluate("el => el.tagName.toLowerCase()")
if tag == "select":
target.select_option(label=value)
else:
target.fill(value)
logger.info("Filled labeled field '%s' -> #%s", label_text, for_attr)
break
def sanitize_filename(name: str) -> str:
name = re.sub(r'[<>:"/\\|?*]', '_', name)
name = re.sub(r'\t+', '_', name)
name = re.sub(r'\n+', '_', name)
return name.strip('_')
def export_invoice_pdf(
context: BrowserContext,
page: Page,
invoice: dict,
output_dir: Path,
billing_url: str = BILLING_URL,
) -> tuple[Page, Path]:
invoice_page = None
if not invoice.get("reference_id"):
raise RuntimeError("Invoice reference id is required to open the invoice popup")
selector = f"a[data-reference-object-id='{invoice['reference_id']}']"
anchor = page.locator(selector).first
if anchor.count() > 0:
anchor.scroll_into_view_if_needed()
with page.expect_popup() as popup_info:
anchor.click()
invoice_page = popup_info.value
else:
with page.expect_popup() as popup_info:
page.evaluate("printPayment", invoice["reference_id"])
invoice_page = popup_info.value
invoice_page.wait_for_load_state("domcontentloaded")
time.sleep(1)
fill_invoice_fields(invoice_page)
invoice_page.evaluate("""() => {
const decode = (value) => {
let prev = value;
for (let i = 0; i < 5; i += 1) {
const textarea = document.createElement('textarea');
textarea.innerHTML = prev;
const next = textarea.value;
if (next === prev) return next;
prev = next;
}
return prev;
};
const container = document.querySelector('.b2-invoice-customer-right');
if (!container) return;
const walker = document.createTreeWalker(container, NodeFilter.SHOW_TEXT);
const nodes = [];
while (walker.nextNode()) nodes.push(walker.currentNode);
nodes.forEach((node) => {
node.nodeValue = decode(node.nodeValue);
});
}""")
time.sleep(0.5)
date_text = ""
date_container = invoice_page.locator(".b2-invoice-customer-right").first
if date_container.count() > 0:
lines = [line.strip() for line in date_container.inner_text().splitlines() if line.strip()]
if lines:
date_text = lines[0].replace(" UTC", "")
if not date_text:
date_text = "unknown-date"
invoice_id = str(invoice.get("reference_id", "unknown"))
filename = sanitize_filename(f"{date_text} Invoice {invoice_id}")
output_path = output_dir / f"{filename}.pdf"
if output_path.exists():
logger.info("Skipping (exists): %s", output_path)
if invoice_page is not page:
invoice_page.close()
return page, output_path
invoice_page.pdf(path=str(output_path), format="A4", print_background=True)
logger.info("Saved: %s", output_path)
if invoice_page is not page:
invoice_page.close()
return page, output_path
def download_all_invoices() -> list[Path]:
Config.validate()
output_dir = Path(Config.OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
saved = []
stealth = Stealth()
data_dir = Path(Config.BROWSER_DATA_DIR).resolve()
data_dir.mkdir(parents=True, exist_ok=True)
with stealth.use_sync(sync_playwright()) as p:
context = p.chromium.launch_persistent_context(
user_data_dir=str(data_dir),
headless=Config.BROWSER_HEADLESS,
args=["--disable-blink-features=AutomationControlled"],
)
page = context.pages[0] if context.pages else context.new_page()
page.set_default_timeout(Config.BROWSER_TIMEOUT)
try:
page = login(context, page)
except Exception:
logger.exception("Login failed. Browser will remain open for inspection.")
input("Login failed. Inspect the browser, then press Enter to close it...")
raise
billing_pages = ["b2", "groups"]
for billing_page in billing_pages:
if billing_page == "b2":
billing_url = f"{BILLING_URL}"
else:
billing_url = f"{BILLING_URL}?billing_page={billing_page}"
page = _goto_with_retry(context, page, billing_url)
page, groups = get_group_options(context, page, billing_url=billing_url)
logger.info(
"Found %d group(s) on billing page '%s': %s",
len(groups),
billing_page,
[g["label"] for g in groups],
)
for group in groups:
group_label = sanitize_filename(group["label"])
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
if group["value"]:
page = _goto_with_retry(context, page, billing_url)
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
years = get_year_options(page)
if not years:
years = ["all"]
logger.info("Group '%s' - years: %s", group["label"], years)
for year in years:
year_dir = group_dir / year if year != "all" else group_dir
if year != "all":
year_select = page.locator("select#yearSelection, select#receiptDateId").first
if year_select.count() > 0:
with page.expect_navigation(wait_until="domcontentloaded", timeout=30000):
year_select.select_option(value=year)
time.sleep(1)
invoices = get_invoice_links(page)
logger.info(
"Group '%s', Year '%s' - found %d invoice(s)",
group["label"], year, len(invoices),
)
if not invoices:
logger.warning(
"No invoices found for group '%s', year '%s'. Browser will remain open for inspection.",
group["label"],
year,
)
input("No invoices found. Inspect the browser, then press Enter to continue...")
continue
year_dir.mkdir(parents=True, exist_ok=True)
for invoice in invoices:
try:
page, path = export_invoice_pdf(
context,
page,
invoice,
year_dir,
billing_url=billing_url,
)
saved.append(path)
except Exception:
logger.exception("Failed to export: %s", invoice["url"])
if year != "all":
page = _goto_with_retry(context, page, billing_url)
if group["value"]:
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("domcontentloaded")
time.sleep(1)
context.close()
return saved