Retry
This commit is contained in:
@@ -16,7 +16,7 @@ class Config:
|
|||||||
|
|
||||||
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices")
|
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices")
|
||||||
BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true"
|
BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true"
|
||||||
BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "30000"))
|
BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "60000"))
|
||||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -12,21 +12,57 @@ logger = logging.getLogger(__name__)
|
|||||||
BASE_URL = "https://secure.backblaze.com"
|
BASE_URL = "https://secure.backblaze.com"
|
||||||
BILLING_URL = f"{BASE_URL}/billing.htm"
|
BILLING_URL = f"{BASE_URL}/billing.htm"
|
||||||
|
|
||||||
|
CLOUDFLARE_WAIT_MAX = 60
|
||||||
|
CLOUDFLARE_POLL_INTERVAL = 2
|
||||||
|
|
||||||
|
|
||||||
|
def _wait_for_cloudflare(page: Page) -> None:
|
||||||
|
elapsed = 0
|
||||||
|
while elapsed < CLOUDFLARE_WAIT_MAX:
|
||||||
|
title = page.title()
|
||||||
|
if "Just a moment" not in title:
|
||||||
|
return
|
||||||
|
logger.debug("Waiting for Cloudflare challenge... (%ds)", elapsed)
|
||||||
|
time.sleep(CLOUDFLARE_POLL_INTERVAL)
|
||||||
|
elapsed += CLOUDFLARE_POLL_INTERVAL
|
||||||
|
raise RuntimeError(
|
||||||
|
"Cloudflare challenge did not resolve after %ds. "
|
||||||
|
"Try running with --no-headless so you can solve it manually, "
|
||||||
|
"or set BROWSER_HEADLESS=false in .env" % CLOUDFLARE_WAIT_MAX
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _goto_with_retry(page: Page, url: str, retries: int = 3) -> None:
|
||||||
|
for attempt in range(1, retries + 1):
|
||||||
|
try:
|
||||||
|
page.goto(url, wait_until="domcontentloaded")
|
||||||
|
_wait_for_cloudflare(page)
|
||||||
|
return
|
||||||
|
except RuntimeError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Navigation attempt %d/%d failed for %s: %s", attempt, retries, url, e)
|
||||||
|
if attempt == retries:
|
||||||
|
raise
|
||||||
|
time.sleep(2 * attempt)
|
||||||
|
|
||||||
|
|
||||||
def login(page: Page) -> None:
|
def login(page: Page) -> None:
|
||||||
logger.info("Logging in to Backblaze...")
|
logger.info("Logging in to Backblaze...")
|
||||||
page.goto(f"{BASE_URL}/user_signin.htm", wait_until="networkidle")
|
_goto_with_retry(page, f"{BASE_URL}/user_signin.htm")
|
||||||
|
page.wait_for_selector("#email", timeout=30000)
|
||||||
page.fill("#email", Config.BACKBLAZE_EMAIL)
|
page.fill("#email", Config.BACKBLAZE_EMAIL)
|
||||||
page.fill("#password", Config.BACKBLAZE_PASSWORD)
|
page.fill("#password", Config.BACKBLAZE_PASSWORD)
|
||||||
page.click("#submitButton")
|
page.click("#submitButton")
|
||||||
page.wait_for_load_state("networkidle")
|
page.wait_for_load_state("domcontentloaded")
|
||||||
|
time.sleep(2)
|
||||||
if "user_signin" in page.url:
|
if "user_signin" in page.url:
|
||||||
raise RuntimeError("Login failed - check credentials")
|
raise RuntimeError("Login failed - check credentials")
|
||||||
logger.info("Login successful")
|
logger.info("Login successful")
|
||||||
|
|
||||||
|
|
||||||
def get_group_options(page: Page) -> list[dict]:
|
def get_group_options(page: Page) -> list[dict]:
|
||||||
page.goto(BILLING_URL, wait_until="networkidle")
|
_goto_with_retry(page, BILLING_URL)
|
||||||
group_select = page.locator("select#groupSelection")
|
group_select = page.locator("select#groupSelection")
|
||||||
if group_select.count() == 0:
|
if group_select.count() == 0:
|
||||||
return [{"value": "", "label": "default"}]
|
return [{"value": "", "label": "default"}]
|
||||||
@@ -41,6 +77,7 @@ def get_group_options(page: Page) -> list[dict]:
|
|||||||
|
|
||||||
|
|
||||||
def get_year_options(page: Page) -> list[str]:
|
def get_year_options(page: Page) -> list[str]:
|
||||||
|
page.wait_for_load_state("domcontentloaded")
|
||||||
year_select = page.locator("select#yearSelection")
|
year_select = page.locator("select#yearSelection")
|
||||||
if year_select.count() == 0:
|
if year_select.count() == 0:
|
||||||
return []
|
return []
|
||||||
@@ -140,7 +177,7 @@ def sanitize_filename(name: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path:
|
def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path:
|
||||||
page.goto(invoice_url, wait_until="networkidle")
|
_goto_with_retry(page, invoice_url)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
fill_invoice_fields(page)
|
fill_invoice_fields(page)
|
||||||
@@ -158,14 +195,25 @@ def download_all_invoices() -> list[Path]:
|
|||||||
saved = []
|
saved = []
|
||||||
|
|
||||||
with sync_playwright() as p:
|
with sync_playwright() as p:
|
||||||
browser = p.chromium.launch(headless=Config.BROWSER_HEADLESS)
|
browser = p.chromium.launch(
|
||||||
context = browser.new_context()
|
headless=Config.BROWSER_HEADLESS,
|
||||||
|
args=["--disable-blink-features=AutomationControlled"],
|
||||||
|
)
|
||||||
|
context = browser.new_context(
|
||||||
|
user_agent=(
|
||||||
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
||||||
|
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
|
||||||
|
),
|
||||||
|
)
|
||||||
page = context.new_page()
|
page = context.new_page()
|
||||||
|
page.add_init_script(
|
||||||
|
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||||||
|
)
|
||||||
page.set_default_timeout(Config.BROWSER_TIMEOUT)
|
page.set_default_timeout(Config.BROWSER_TIMEOUT)
|
||||||
|
|
||||||
login(page)
|
login(page)
|
||||||
|
|
||||||
page.goto(BILLING_URL, wait_until="networkidle")
|
_goto_with_retry(page, BILLING_URL)
|
||||||
|
|
||||||
groups = get_group_options(page)
|
groups = get_group_options(page)
|
||||||
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
|
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
|
||||||
@@ -175,11 +223,11 @@ def download_all_invoices() -> list[Path]:
|
|||||||
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
|
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
|
||||||
|
|
||||||
if group["value"]:
|
if group["value"]:
|
||||||
page.goto(BILLING_URL, wait_until="networkidle")
|
_goto_with_retry(page, BILLING_URL)
|
||||||
group_select = page.locator("select#groupSelection")
|
group_select = page.locator("select#groupSelection")
|
||||||
if group_select.count() > 0:
|
if group_select.count() > 0:
|
||||||
group_select.select_option(value=group["value"])
|
group_select.select_option(value=group["value"])
|
||||||
page.wait_for_load_state("networkidle")
|
page.wait_for_load_state("domcontentloaded")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
years = get_year_options(page)
|
years = get_year_options(page)
|
||||||
@@ -194,7 +242,7 @@ def download_all_invoices() -> list[Path]:
|
|||||||
year_select = page.locator("select#yearSelection")
|
year_select = page.locator("select#yearSelection")
|
||||||
if year_select.count() > 0:
|
if year_select.count() > 0:
|
||||||
year_select.select_option(value=year)
|
year_select.select_option(value=year)
|
||||||
page.wait_for_load_state("networkidle")
|
page.wait_for_load_state("domcontentloaded")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
invoices = get_invoice_links(page)
|
invoices = get_invoice_links(page)
|
||||||
@@ -223,12 +271,12 @@ def download_all_invoices() -> list[Path]:
|
|||||||
logger.exception("Failed to export: %s", invoice["url"])
|
logger.exception("Failed to export: %s", invoice["url"])
|
||||||
|
|
||||||
if year != "all":
|
if year != "all":
|
||||||
page.goto(BILLING_URL, wait_until="networkidle")
|
_goto_with_retry(page, BILLING_URL)
|
||||||
if group["value"]:
|
if group["value"]:
|
||||||
group_select = page.locator("select#groupSelection")
|
group_select = page.locator("select#groupSelection")
|
||||||
if group_select.count() > 0:
|
if group_select.count() > 0:
|
||||||
group_select.select_option(value=group["value"])
|
group_select.select_option(value=group["value"])
|
||||||
page.wait_for_load_state("networkidle")
|
page.wait_for_load_state("domcontentloaded")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
browser.close()
|
browser.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user