359 lines
13 KiB
Python
359 lines
13 KiB
Python
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from playwright.sync_api import sync_playwright, Page, BrowserContext
|
|
from playwright_stealth import Stealth
|
|
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BASE_URL = "https://secure.backblaze.com"
|
|
BILLING_URL = f"{BASE_URL}/billing_card.htm"
|
|
|
|
CLOUDFLARE_WAIT_MAX = 60
|
|
CLOUDFLARE_POLL_INTERVAL = 2
|
|
|
|
def _wait_for_cloudflare(page: Page) -> None:
|
|
elapsed = 0
|
|
while elapsed < CLOUDFLARE_WAIT_MAX:
|
|
try:
|
|
title = page.title()
|
|
except Exception:
|
|
return
|
|
if "Just a moment" not in title:
|
|
return
|
|
logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed)
|
|
time.sleep(CLOUDFLARE_POLL_INTERVAL)
|
|
elapsed += CLOUDFLARE_POLL_INTERVAL
|
|
raise RuntimeError(
|
|
"Cloudflare challenge did not resolve after %ds. "
|
|
"Try running with --no-headless so you can solve it manually, "
|
|
"or set BROWSER_HEADLESS=false in .env" % CLOUDFLARE_WAIT_MAX
|
|
)
|
|
|
|
|
|
def _ensure_page(context: BrowserContext, page: Page) -> Page:
|
|
try:
|
|
page.title()
|
|
return page
|
|
except Exception:
|
|
logger.info("Page was closed, creating a new one")
|
|
new_page = context.new_page()
|
|
new_page.set_default_timeout(Config.BROWSER_TIMEOUT)
|
|
return new_page
|
|
|
|
|
|
def _goto_with_retry(context: BrowserContext, page: Page, url: str, retries: int = 3) -> Page:
|
|
for attempt in range(1, retries + 1):
|
|
page = _ensure_page(context, page)
|
|
try:
|
|
page.goto(url, wait_until="domcontentloaded")
|
|
_wait_for_cloudflare(page)
|
|
return page
|
|
except RuntimeError:
|
|
raise
|
|
except Exception as e:
|
|
logger.warning("Navigation attempt %d/%d failed for %s: %s", attempt, retries, url, e)
|
|
if attempt == retries:
|
|
raise
|
|
time.sleep(2 * attempt)
|
|
return page
|
|
|
|
|
|
def login(context: BrowserContext, page: Page) -> Page:
|
|
logger.info("Logging in to Backblaze...")
|
|
page = _goto_with_retry(context, page, f"{BASE_URL}/user_signin.htm")
|
|
page.wait_for_selector("#email-field", timeout=30000)
|
|
page.fill("#email-field", Config.BACKBLAZE_EMAIL)
|
|
page.click("#submit-button")
|
|
page.wait_for_selector("#password-field", timeout=30000)
|
|
page.fill("#password-field", Config.BACKBLAZE_PASSWORD)
|
|
page.click("#submit-button")
|
|
page.wait_for_load_state("domcontentloaded")
|
|
time.sleep(5)
|
|
|
|
code_field = page.locator("#code-field")
|
|
if code_field.count() > 0:
|
|
code = input("Enter TOTP code: ").strip()
|
|
code_field.fill(code)
|
|
remember = page.locator("#bz-redesign-switch-checkbox")
|
|
if remember.count() > 0 and not remember.is_checked():
|
|
remember.check()
|
|
page.click("#submit-button")
|
|
page.wait_for_load_state("domcontentloaded")
|
|
time.sleep(5)
|
|
|
|
if "user_signin" in page.url:
|
|
raise RuntimeError("Login failed - check credentials")
|
|
logger.info("Login successful")
|
|
return page
|
|
|
|
|
|
def get_group_options(context: BrowserContext, page: Page) -> tuple[Page, list[dict]]:
|
|
page = _goto_with_retry(context, page, BILLING_URL)
|
|
group_select = page.locator("select#groupSelection")
|
|
if group_select.count() == 0:
|
|
return page, [{"value": "", "label": "default"}]
|
|
options = group_select.locator("option").all()
|
|
groups = []
|
|
for opt in options:
|
|
val = opt.get_attribute("value") or ""
|
|
label = opt.inner_text().strip()
|
|
if val or label:
|
|
groups.append({"value": val, "label": label})
|
|
return page, (groups if groups else [{"value": "", "label": "default"}])
|
|
|
|
|
|
def get_year_options(page: Page) -> list[str]:
|
|
page.wait_for_load_state("domcontentloaded")
|
|
year_select = page.locator("select#yearSelection")
|
|
if year_select.count() == 0:
|
|
return []
|
|
options = year_select.locator("option").all()
|
|
return [opt.get_attribute("value") or opt.inner_text().strip() for opt in options]
|
|
|
|
|
|
def get_invoice_links(page: Page) -> list[dict]:
|
|
links = []
|
|
seen = set()
|
|
|
|
def _add_link(url: str, label: str) -> None:
|
|
if not url or url in seen:
|
|
return
|
|
seen.add(url)
|
|
links.append({"url": url, "label": label})
|
|
|
|
rows = page.locator("table.billing-table tbody tr, table#billingTable tbody tr, table tbody tr").all()
|
|
for row in rows:
|
|
anchors = row.locator("a[href*='billing_invoice'], a[href*='invoice']").all()
|
|
for anchor in anchors:
|
|
href = anchor.get_attribute("href") or ""
|
|
text = anchor.inner_text().strip()
|
|
if href:
|
|
if not href.startswith("http"):
|
|
href = f"{BASE_URL}/{href.lstrip('/')}"
|
|
_add_link(href, text)
|
|
|
|
if not links:
|
|
all_anchors = page.locator("a[href*='invoice']").all()
|
|
for anchor in all_anchors:
|
|
href = anchor.get_attribute("href") or ""
|
|
text = anchor.inner_text().strip()
|
|
if href and "invoice" in href.lower():
|
|
if not href.startswith("http"):
|
|
href = f"{BASE_URL}/{href.lstrip('/')}"
|
|
_add_link(href, text)
|
|
|
|
if not links:
|
|
print_links = page.locator("a.no-print[data-reference-object-id], a[data-reference-object-id]").all()
|
|
for anchor in print_links:
|
|
href = anchor.get_attribute("href") or ""
|
|
if href:
|
|
if not href.startswith("http"):
|
|
href = f"{BASE_URL}/{href.lstrip('/')}"
|
|
_add_link(href, anchor.inner_text().strip())
|
|
continue
|
|
|
|
try:
|
|
anchor.scroll_into_view_if_needed()
|
|
with page.expect_popup() as popup_info:
|
|
anchor.click()
|
|
popup = popup_info.value
|
|
popup.wait_for_load_state("domcontentloaded")
|
|
label = anchor.inner_text().strip() or anchor.get_attribute("data-reference-object-id") or popup.title()
|
|
_add_link(popup.url, label)
|
|
popup.close()
|
|
except Exception as e:
|
|
logger.warning("Failed to open invoice popup: %s", e)
|
|
|
|
return links
|
|
|
|
|
|
def fill_invoice_fields(page: Page) -> None:
|
|
fields = {
|
|
"company": Config.INVOICE_COMPANY,
|
|
"notes": Config.INVOICE_NOTES,
|
|
}
|
|
|
|
for field_id, value in fields.items():
|
|
if not value:
|
|
continue
|
|
name_override = {"company": "Company", "notes": "Other"}.get(field_id)
|
|
selectors = [
|
|
f"#{field_id}",
|
|
f"input[name='{field_id}']",
|
|
f"textarea[name='{field_id}']",
|
|
f"select[name='{field_id}']",
|
|
f"input[id*='{field_id}' i]",
|
|
f"textarea[id*='{field_id}' i]",
|
|
f"select[id*='{field_id}' i]",
|
|
f"input[name*='{field_id}' i]",
|
|
f"textarea[name*='{field_id}' i]",
|
|
f"select[name*='{field_id}' i]",
|
|
]
|
|
if name_override:
|
|
selectors.insert(1, f"input[name='{name_override}']")
|
|
selectors.insert(2, f"textarea[name='{name_override}']")
|
|
selectors.insert(3, f"select[name='{name_override}']")
|
|
for selector in selectors:
|
|
el = page.locator(selector).first
|
|
if el.count() > 0:
|
|
tag = el.evaluate("el => el.tagName.toLowerCase()")
|
|
if tag == "select":
|
|
el.select_option(label=value)
|
|
else:
|
|
el.fill(value)
|
|
logger.info("Filled field %s", field_id)
|
|
break
|
|
|
|
for label_text, value in [
|
|
("VAT", Config.INVOICE_VAT_ID),
|
|
("Tax", Config.INVOICE_VAT_ID),
|
|
("Document Type", Config.INVOICE_DOCUMENT_TYPE),
|
|
("Type", Config.INVOICE_DOCUMENT_TYPE),
|
|
("Company", Config.INVOICE_COMPANY),
|
|
("Notes", Config.INVOICE_NOTES),
|
|
("Note", Config.INVOICE_NOTES),
|
|
]:
|
|
if not value:
|
|
continue
|
|
labels = page.locator(f"label:has-text('{label_text}')").all()
|
|
for label in labels:
|
|
for_attr = label.get_attribute("for")
|
|
if for_attr:
|
|
target = page.locator(f"#{for_attr}")
|
|
if target.count() > 0:
|
|
tag = target.evaluate("el => el.tagName.toLowerCase()")
|
|
if tag == "select":
|
|
target.select_option(label=value)
|
|
else:
|
|
target.fill(value)
|
|
logger.info("Filled labeled field '%s' -> #%s", label_text, for_attr)
|
|
break
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
name = re.sub(r'[<>:"/\\|?*]', '_', name)
|
|
name = re.sub(r'\s+', '_', name)
|
|
return name.strip('_')
|
|
|
|
|
|
def export_invoice_pdf(context: BrowserContext, page: Page, invoice_url: str, output_path: Path) -> tuple[Page, Path]:
|
|
page = _goto_with_retry(context, page, invoice_url)
|
|
time.sleep(1)
|
|
|
|
fill_invoice_fields(page)
|
|
time.sleep(0.5)
|
|
|
|
page.pdf(path=str(output_path), format="A4", print_background=True)
|
|
logger.info("Saved: %s", output_path)
|
|
return page, output_path
|
|
|
|
|
|
def download_all_invoices() -> list[Path]:
|
|
Config.validate()
|
|
output_dir = Path(Config.OUTPUT_DIR)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
saved = []
|
|
|
|
stealth = Stealth()
|
|
|
|
data_dir = Path(Config.BROWSER_DATA_DIR).resolve()
|
|
data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
with stealth.use_sync(sync_playwright()) as p:
|
|
context = p.chromium.launch_persistent_context(
|
|
user_data_dir=str(data_dir),
|
|
headless=Config.BROWSER_HEADLESS,
|
|
args=["--disable-blink-features=AutomationControlled"],
|
|
)
|
|
page = context.pages[0] if context.pages else context.new_page()
|
|
page.set_default_timeout(Config.BROWSER_TIMEOUT)
|
|
|
|
try:
|
|
page = login(context, page)
|
|
except Exception:
|
|
logger.exception("Login failed. Browser will remain open for inspection.")
|
|
input("Login failed. Inspect the browser, then press Enter to close it...")
|
|
raise
|
|
|
|
page = _goto_with_retry(context, page, BILLING_URL)
|
|
|
|
page, groups = get_group_options(context, page)
|
|
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
|
|
|
|
for group in groups:
|
|
group_label = sanitize_filename(group["label"])
|
|
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
|
|
|
|
if group["value"]:
|
|
page = _goto_with_retry(context, page, BILLING_URL)
|
|
group_select = page.locator("select#groupSelection")
|
|
if group_select.count() > 0:
|
|
group_select.select_option(value=group["value"])
|
|
page.wait_for_load_state("domcontentloaded")
|
|
time.sleep(1)
|
|
|
|
years = get_year_options(page)
|
|
if not years:
|
|
years = ["all"]
|
|
logger.info("Group '%s' - years: %s", group["label"], years)
|
|
|
|
for year in years:
|
|
year_dir = group_dir / year if year != "all" else group_dir
|
|
|
|
if year != "all":
|
|
year_select = page.locator("select#yearSelection")
|
|
if year_select.count() > 0:
|
|
year_select.select_option(value=year)
|
|
page.wait_for_load_state("domcontentloaded")
|
|
time.sleep(1)
|
|
|
|
invoices = get_invoice_links(page)
|
|
logger.info(
|
|
"Group '%s', Year '%s' - found %d invoice(s)",
|
|
group["label"], year, len(invoices),
|
|
)
|
|
|
|
if not invoices:
|
|
logger.warning(
|
|
"No invoices found for group '%s', year '%s'. Browser will remain open for inspection.",
|
|
group["label"],
|
|
year,
|
|
)
|
|
input("No invoices found. Inspect the browser, then press Enter to continue...")
|
|
continue
|
|
|
|
year_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for idx, invoice in enumerate(invoices):
|
|
label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}"
|
|
pdf_path = year_dir / f"{label}.pdf"
|
|
|
|
if pdf_path.exists():
|
|
logger.info("Skipping (exists): %s", pdf_path)
|
|
saved.append(pdf_path)
|
|
continue
|
|
|
|
try:
|
|
page, path = export_invoice_pdf(context, page, invoice["url"], pdf_path)
|
|
saved.append(path)
|
|
except Exception:
|
|
logger.exception("Failed to export: %s", invoice["url"])
|
|
|
|
if year != "all":
|
|
page = _goto_with_retry(context, page, BILLING_URL)
|
|
if group["value"]:
|
|
group_select = page.locator("select#groupSelection")
|
|
if group_select.count() > 0:
|
|
group_select.select_option(value=group["value"])
|
|
page.wait_for_load_state("domcontentloaded")
|
|
time.sleep(1)
|
|
|
|
context.close()
|
|
|
|
return saved
|