From a9bb2460c6d7beba690bf4b0afb0794a89373fb9 Mon Sep 17 00:00:00 2001 From: Jan Bader Date: Sun, 5 Apr 2026 22:01:46 +0200 Subject: [PATCH] convert to backblaze fetcher --- .env.example | 26 +- .gitignore | 3 + README.md | 202 +++------------- config.py | 50 +--- downloader.py | 236 ++++++++++++++++++ extract.sh | 25 -- extractors/__init__.py | 13 - extractors/blog_extractor.py | 224 ----------------- extractors/instagram_extractor.py | 390 ------------------------------ extractors/youtube_extractor.py | 203 ---------------- flake.nix | 10 +- main.py | 250 +++---------------- obsidian_writer.py | 128 ---------- requirements.txt | 21 -- summarizer.py | 172 ------------- 15 files changed, 333 insertions(+), 1620 deletions(-) create mode 100644 downloader.py delete mode 100755 extract.sh delete mode 100644 extractors/__init__.py delete mode 100644 extractors/blog_extractor.py delete mode 100644 extractors/instagram_extractor.py delete mode 100644 extractors/youtube_extractor.py delete mode 100644 obsidian_writer.py delete mode 100644 summarizer.py diff --git a/.env.example b/.env.example index c5e86c7..db3aafe 100644 --- a/.env.example +++ b/.env.example @@ -1,21 +1,19 @@ -# Content Extractor Configuration +# Backblaze credentials +BACKBLAZE_EMAIL=you@example.com +BACKBLAZE_PASSWORD=your_password -# Obsidian vault path (default: ~/Obsidian Vault) -OBSIDIAN_VAULT_PATH=~/Obsidian Vault +# Invoice fields (all optional - leave empty to skip) +INVOICE_VAT_ID= +INVOICE_DOCUMENT_TYPE= +INVOICE_COMPANY= +INVOICE_NOTES= -# Browser settings (for Instagram extraction) +# Output +OUTPUT_DIR=./invoices + +# Browser BROWSER_HEADLESS=true BROWSER_TIMEOUT=30000 -# Content extraction settings -MAX_CONTENT_LENGTH=10000 -GENERATE_SUMMARY=true - -# YouTube settings -YOUTUBE_LANGUAGE=en - -# Instagram settings -INSTAGRAM_WAIT_TIME=5 - # Logging LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore index 405b2d0..ffd3944 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,6 @@ htmlcov/ # Playwright .playwright/ .direnv/ + +# Output +invoices/ diff --git a/README.md b/README.md index 46d82ee..742278c 100644 --- a/README.md +++ b/README.md @@ -1,198 +1,64 @@ -# Content Extractor šŸ”„ +# Backblaze Invoice Downloader -Extract key information from URLs (YouTube, Instagram, blogs) and save to Obsidian notes automatically. +Download Backblaze invoices as PDF using browser automation. -## Features +Backblaze only provides invoices via a web page that must be printed — this tool automates that process using Playwright, filling in configurable fields (VAT ID, document type, company, notes) and exporting each invoice to PDF. -- **YouTube Videos**: Extract title, description, transcript, author, duration, views -- **Instagram Reels**: Extract caption, author, engagement metrics (via browser automation) -- **Blog Posts/Articles**: Extract title, author, content, tags, publish date -- **Auto-save to Obsidian**: Notes are automatically formatted and saved to your Obsidian vault -- **Smart Summaries**: Generates key points from extracted content - -## Installation +## Setup ```bash -# Navigate to the content-extractor directory -cd ~/Desktop/itsthatnewshit/content-extractor - -# Install dependencies pip install -r requirements.txt - -# Install Playwright browsers (for Instagram extraction) -playwright install +playwright install chromium ``` -## Usage - -### Basic Usage +Or with Nix: ```bash -# Extract from YouTube video -python main.py "https://www.youtube.com/watch?v=VIDEO_ID" - -# Extract from Instagram reel -python main.py "https://www.instagram.com/reel/REEL_ID" - -# Extract from blog post -python main.py "https://example.com/article" -``` - -### Advanced Options - -```bash -# Specify Obsidian vault path -python main.py --obsidian-path "/path/to/Obsidian Vault" - -# Custom output filename -python main.py --output "my-note-title" - -# Save to specific folder in Obsidian -python main.py --folder "Learning/YouTube" - -# Only print content, don't save to Obsidian -python main.py --no-save - -# Generate summary -python main.py --summarize -``` - -### Examples - -```bash -# Save YouTube tutorial to Learning folder -python main.py "https://youtu.be/abc123" --folder "Learning" --output "Python Tutorial" - -# Extract Instagram reel without saving -python main.py "https://instagram.com/reel/xyz789" --no-save - -# Extract blog post to default vault -python main.py "https://medium.com/article" --folder "Articles" +nix develop ``` ## Configuration -Create a `.env` file in the project directory to customize settings: - -```bash -cp .env.example .env -``` - -Edit `.env` with your preferences: +Create a `.env` file (see `.env.example`): ```env -# Obsidian vault path -OBSIDIAN_VAULT_PATH=~/Obsidian Vault +BACKBLAZE_EMAIL=you@example.com +BACKBLAZE_PASSWORD=your_password -# Browser settings (for Instagram) +INVOICE_VAT_ID=DE123456789 +INVOICE_DOCUMENT_TYPE=Invoice +INVOICE_COMPANY=My Company GmbH +INVOICE_NOTES=Internal ref: 12345 + +OUTPUT_DIR=./invoices BROWSER_HEADLESS=true -BROWSER_TIMEOUT=30000 - -# Content extraction -MAX_CONTENT_LENGTH=10000 -GENERATE_SUMMARY=true - -# OpenAI/OpenRouter -OPENAI_API_KEY=your_key_here -OPENAI_URL=https://openrouter.ai/api/v1/chat/completions -OPENAI_MODEL=gpt-4o-mini -OPENAI_TIMEOUT=30 - -# YouTube -YOUTUBE_LANGUAGE=en - -# Instagram -INSTAGRAM_WAIT_TIME=5 ``` -## Output Format +## Usage -Notes are saved in markdown format with: - -- Title and metadata (source, URL, extraction date) -- Author, duration, views (when available) -- Description/summary -- Full content (transcript or article text) -- Key points -- Tags for easy organization - -Example output: - -```markdown -# How to Build AI Agents - -## Metadata -- **Source**: Youtube -- **URL**: https://youtube.com/watch?v=abc123 -- **Extracted**: 2026-02-21 15:30:00 -- **Author**: Tech Channel -- **Duration**: 12:34 -- **Views**: 1.2M - -## Description -Learn how to build AI agents from scratch... - -## Content -[Full transcript or article text...] - -## Key Points -- Point 1 from the content -- Point 2 from the content -- Point 3 from the content - ---- - -## Tags -#youtube #video #ai #agents #notes -``` - -## Troubleshooting - -### Instagram extraction fails -Instagram requires browser automation. Make sure you've run: ```bash -playwright install +python main.py ``` -If it still fails, Instagram may have changed their UI. The extractor has a fallback mode that will still extract basic info. - -### YouTube transcript not available -Some videos don't have captions/transcripts. The extractor will fall back to extracting the description only. - -### Obsidian vault not found -By default, the tool looks for `~/Obsidian Vault`. If your vault is elsewhere, use the `--obsidian-path` flag or set `OBSIDIAN_VAULT_PATH` in your `.env` file. - -## Project Structure +### Options ``` -content-extractor/ -ā”œā”€ā”€ main.py # Main entry point -ā”œā”€ā”€ config.py # Configuration settings -ā”œā”€ā”€ obsidian_writer.py # Obsidian note writer -ā”œā”€ā”€ requirements.txt # Python dependencies -ā”œā”€ā”€ .env.example # Example environment file -ā”œā”€ā”€ README.md # This file -└── extractors/ - ā”œā”€ā”€ __init__.py - ā”œā”€ā”€ youtube_extractor.py # YouTube extraction - ā”œā”€ā”€ instagram_extractor.py # Instagram extraction - └── blog_extractor.py # Blog/article extraction +-o, --output DIR Output directory (default: ./invoices) +--headless Run browser headless +--no-headless Show browser window (useful for debugging) +--vat-id ID VAT ID to fill on invoices +--document-type TYPE Document type to select +--company NAME Company name to fill +--notes TEXT Notes to fill on invoices +-v, --verbose Verbose logging ``` -## Future Enhancements +CLI arguments override `.env` values. -- [ ] AI-powered summarization (using LLMs) -- [ ] Podcast/audio extraction (whisper transcription) -- [ ] Twitter/X thread extraction -- [ ] LinkedIn post extraction -- [ ] Batch processing (extract from multiple URLs) -- [ ] Web interface -- [ ] Automatic tagging based on content +## How it works -## License - -MIT License - Feel free to use and modify! - ---- - -Built with šŸ”„ by RUBIUS for naki +1. Logs in to `secure.backblaze.com` +2. Navigates to the billing page +3. Iterates over all billing groups and years +4. For each invoice, opens the invoice page, fills the configured fields, and exports to PDF +5. Skips already-downloaded invoices diff --git a/config.py b/config.py index 8ddd31e..260a44a 100644 --- a/config.py +++ b/config.py @@ -1,54 +1,26 @@ -""" -Configuration for Content Extractor -""" - import os from pathlib import Path from dotenv import load_dotenv -# Load environment variables load_dotenv() class Config: - """Configuration settings for content extractor.""" + BACKBLAZE_EMAIL = os.getenv("BACKBLAZE_EMAIL", "") + BACKBLAZE_PASSWORD = os.getenv("BACKBLAZE_PASSWORD", "") - # Obsidian vault path (default to common locations) - OBSIDIAN_VAULT_PATH = os.getenv( - "OBSIDIAN_VAULT_PATH", - os.path.expanduser("~/Obsidian Vault") # Default location - ) + INVOICE_VAT_ID = os.getenv("INVOICE_VAT_ID", "") + INVOICE_DOCUMENT_TYPE = os.getenv("INVOICE_DOCUMENT_TYPE", "") + INVOICE_COMPANY = os.getenv("INVOICE_COMPANY", "") + INVOICE_NOTES = os.getenv("INVOICE_NOTES", "") - # Browser settings (for Instagram and dynamic content) + OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices") BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true" - BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "30000")) # 30 seconds - - # Content extraction settings - MAX_CONTENT_LENGTH = int(os.getenv("MAX_CONTENT_LENGTH", "10000")) # Max chars - GENERATE_SUMMARY = os.getenv("GENERATE_SUMMARY", "true").lower() == "true" - - # OpenAI/OpenRouter settings - OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - OPENAI_URL = os.getenv("OPENAI_URL", "https://api.openai.com/v1/chat/completions") - OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") - OPENAI_TIMEOUT = int(os.getenv("OPENAI_TIMEOUT", "30")) - OPENAI_LOG_PAYLOAD = os.getenv("OPENAI_LOG_PAYLOAD", "false").lower() == "true" - - # YouTube settings - YOUTUBE_LANGUAGE = os.getenv("YOUTUBE_LANGUAGE", "en") - - # Instagram settings (requires browser automation) - INSTAGRAM_WAIT_TIME = int(os.getenv("INSTAGRAM_WAIT_TIME", "5")) # seconds - - # Logging + BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "30000")) LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") - LOG_FILE = os.getenv("LOG_FILE", "content_extractor.log") @classmethod def validate(cls): - """Validate configuration.""" - # Check if Obsidian vault path exists - if not Path(cls.OBSIDIAN_VAULT_PATH).exists(): - print(f"āš ļø Warning: Obsidian vault path does not exist: {cls.OBSIDIAN_VAULT_PATH}") - print(" You can set OBSIDIAN_VAULT_PATH environment variable or use --obsidian-path flag") - return True + if not cls.BACKBLAZE_EMAIL or not cls.BACKBLAZE_PASSWORD: + raise ValueError("BACKBLAZE_EMAIL and BACKBLAZE_PASSWORD must be set") + Path(cls.OUTPUT_DIR).mkdir(parents=True, exist_ok=True) diff --git a/downloader.py b/downloader.py new file mode 100644 index 0000000..a7144a3 --- /dev/null +++ b/downloader.py @@ -0,0 +1,236 @@ +import logging +import re +import time +from pathlib import Path + +from playwright.sync_api import sync_playwright, Page, Browser + +from config import Config + +logger = logging.getLogger(__name__) + +BASE_URL = "https://secure.backblaze.com" +BILLING_URL = f"{BASE_URL}/billing.htm" + + +def login(page: Page) -> None: + logger.info("Logging in to Backblaze...") + page.goto(f"{BASE_URL}/user_signin.htm", wait_until="networkidle") + page.fill("#email", Config.BACKBLAZE_EMAIL) + page.fill("#password", Config.BACKBLAZE_PASSWORD) + page.click("#submitButton") + page.wait_for_load_state("networkidle") + if "user_signin" in page.url: + raise RuntimeError("Login failed - check credentials") + logger.info("Login successful") + + +def get_group_options(page: Page) -> list[dict]: + page.goto(BILLING_URL, wait_until="networkidle") + group_select = page.locator("select#groupSelection") + if group_select.count() == 0: + return [{"value": "", "label": "default"}] + options = group_select.locator("option").all() + groups = [] + for opt in options: + val = opt.get_attribute("value") or "" + label = opt.inner_text().strip() + if val or label: + groups.append({"value": val, "label": label}) + return groups if groups else [{"value": "", "label": "default"}] + + +def get_year_options(page: Page) -> list[str]: + year_select = page.locator("select#yearSelection") + if year_select.count() == 0: + return [] + options = year_select.locator("option").all() + return [opt.get_attribute("value") or opt.inner_text().strip() for opt in options] + + +def get_invoice_links(page: Page) -> list[dict]: + links = [] + rows = page.locator("table.billing-table tbody tr, table#billingTable tbody tr, table tbody tr").all() + for row in rows: + anchors = row.locator("a[href*='billing_invoice'], a[href*='invoice']").all() + for anchor in anchors: + href = anchor.get_attribute("href") or "" + text = anchor.inner_text().strip() + if href: + if not href.startswith("http"): + href = f"{BASE_URL}/{href.lstrip('/')}" + links.append({"url": href, "label": text}) + + if not links: + all_anchors = page.locator("a[href*='invoice']").all() + for anchor in all_anchors: + href = anchor.get_attribute("href") or "" + text = anchor.inner_text().strip() + if href and "invoice" in href.lower(): + if not href.startswith("http"): + href = f"{BASE_URL}/{href.lstrip('/')}" + links.append({"url": href, "label": text}) + + return links + + +def fill_invoice_fields(page: Page) -> None: + fields = { + "vatId": Config.INVOICE_VAT_ID, + "documentType": Config.INVOICE_DOCUMENT_TYPE, + "company": Config.INVOICE_COMPANY, + "notes": Config.INVOICE_NOTES, + } + + for field_id, value in fields.items(): + if not value: + continue + for selector in [ + f"#{field_id}", + f"input[name='{field_id}']", + f"textarea[name='{field_id}']", + f"select[name='{field_id}']", + f"input[id*='{field_id}' i]", + f"textarea[id*='{field_id}' i]", + f"select[id*='{field_id}' i]", + f"input[name*='{field_id}' i]", + f"textarea[name*='{field_id}' i]", + f"select[name*='{field_id}' i]", + ]: + el = page.locator(selector).first + if el.count() > 0: + tag = el.evaluate("el => el.tagName.toLowerCase()") + if tag == "select": + el.select_option(label=value) + else: + el.fill(value) + logger.info("Filled field %s", field_id) + break + + for label_text, value in [ + ("VAT", Config.INVOICE_VAT_ID), + ("Tax", Config.INVOICE_VAT_ID), + ("Document Type", Config.INVOICE_DOCUMENT_TYPE), + ("Type", Config.INVOICE_DOCUMENT_TYPE), + ("Company", Config.INVOICE_COMPANY), + ("Notes", Config.INVOICE_NOTES), + ("Note", Config.INVOICE_NOTES), + ]: + if not value: + continue + labels = page.locator(f"label:has-text('{label_text}')").all() + for label in labels: + for_attr = label.get_attribute("for") + if for_attr: + target = page.locator(f"#{for_attr}") + if target.count() > 0: + tag = target.evaluate("el => el.tagName.toLowerCase()") + if tag == "select": + target.select_option(label=value) + else: + target.fill(value) + logger.info("Filled labeled field '%s' -> #%s", label_text, for_attr) + break + + +def sanitize_filename(name: str) -> str: + name = re.sub(r'[<>:"/\\|?*]', '_', name) + name = re.sub(r'\s+', '_', name) + return name.strip('_') + + +def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path: + page.goto(invoice_url, wait_until="networkidle") + time.sleep(1) + + fill_invoice_fields(page) + time.sleep(0.5) + + page.pdf(path=str(output_path), format="A4", print_background=True) + logger.info("Saved: %s", output_path) + return output_path + + +def download_all_invoices() -> list[Path]: + Config.validate() + output_dir = Path(Config.OUTPUT_DIR) + output_dir.mkdir(parents=True, exist_ok=True) + saved = [] + + with sync_playwright() as p: + browser = p.chromium.launch(headless=Config.BROWSER_HEADLESS) + context = browser.new_context() + page = context.new_page() + page.set_default_timeout(Config.BROWSER_TIMEOUT) + + login(page) + + page.goto(BILLING_URL, wait_until="networkidle") + + groups = get_group_options(page) + logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups]) + + for group in groups: + group_label = sanitize_filename(group["label"]) + group_dir = output_dir / group_label if len(groups) > 1 else output_dir + + if group["value"]: + page.goto(BILLING_URL, wait_until="networkidle") + group_select = page.locator("select#groupSelection") + if group_select.count() > 0: + group_select.select_option(value=group["value"]) + page.wait_for_load_state("networkidle") + time.sleep(1) + + years = get_year_options(page) + if not years: + years = ["all"] + logger.info("Group '%s' - years: %s", group["label"], years) + + for year in years: + year_dir = group_dir / year if year != "all" else group_dir + + if year != "all": + year_select = page.locator("select#yearSelection") + if year_select.count() > 0: + year_select.select_option(value=year) + page.wait_for_load_state("networkidle") + time.sleep(1) + + invoices = get_invoice_links(page) + logger.info( + "Group '%s', Year '%s' - found %d invoice(s)", + group["label"], year, len(invoices), + ) + + if not invoices: + continue + + year_dir.mkdir(parents=True, exist_ok=True) + + for idx, invoice in enumerate(invoices): + label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}" + pdf_path = year_dir / f"{label}.pdf" + + if pdf_path.exists(): + logger.info("Skipping (exists): %s", pdf_path) + saved.append(pdf_path) + continue + + try: + saved.append(export_invoice_pdf(page, invoice["url"], pdf_path)) + except Exception: + logger.exception("Failed to export: %s", invoice["url"]) + + if year != "all": + page.goto(BILLING_URL, wait_until="networkidle") + if group["value"]: + group_select = page.locator("select#groupSelection") + if group_select.count() > 0: + group_select.select_option(value=group["value"]) + page.wait_for_load_state("networkidle") + time.sleep(1) + + browser.close() + + return saved diff --git a/extract.sh b/extract.sh deleted file mode 100755 index a4dd429..0000000 --- a/extract.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash -# Content Extractor - Quick extraction script -# Usage: ./extract.sh [folder] - -if [ -z "$1" ]; then - echo "Usage: $0 [folder]" - echo "" - echo "Examples:" - echo " $0 https://youtube.com/watch?v=abc123" - echo " $0 https://instagram.com/reel/xyz789 Learning" - echo " $0 https://medium.com/article Articles" - exit 1 -fi - -URL="$1" -FOLDER="${2:-Content Extractor}" - -echo "šŸ”„ Content Extractor" -echo "====================" -echo "URL: $URL" -echo "Folder: $FOLDER" -echo "" - -cd "$(dirname "$0")" -python main.py "$URL" --folder "$FOLDER" diff --git a/extractors/__init__.py b/extractors/__init__.py deleted file mode 100644 index b0882f1..0000000 --- a/extractors/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -""" -Content Extractors Package -""" - -from .youtube_extractor import YouTubeExtractor -from .blog_extractor import BlogExtractor -from .instagram_extractor import InstagramExtractor - -__all__ = [ - "YouTubeExtractor", - "BlogExtractor", - "InstagramExtractor", -] diff --git a/extractors/blog_extractor.py b/extractors/blog_extractor.py deleted file mode 100644 index 4af4308..0000000 --- a/extractors/blog_extractor.py +++ /dev/null @@ -1,224 +0,0 @@ -""" -Blog/Article Extractor - -Extracts: -- Title, author, publish date -- Main article content -- Tags/categories -- Summary -""" - -import re -from typing import Dict, Any, Optional -from urllib.parse import urlparse - -try: - import requests - from bs4 import BeautifulSoup -except ImportError: - requests = None - BeautifulSoup = None - - -class BlogExtractor: - """Extract content from blog posts and articles.""" - - def __init__(self, url: str): - self.url = url - self.html = None - self.soup = None - self._fetch_page() - - def _fetch_page(self): - """Fetch the webpage.""" - if requests is None: - raise ImportError("requests not installed. Run: pip install requests") - - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' - } - - try: - response = requests.get(self.url, headers=headers, timeout=30) - response.raise_for_status() - self.html = response.text - except Exception as e: - raise Exception(f"Failed to fetch page: {str(e)}") - - def _parse_html(self): - """Parse HTML with BeautifulSoup.""" - if BeautifulSoup is None: - raise ImportError("beautifulsoup4 not installed. Run: pip install beautifulsoup4") - - if self.soup is None: - self.soup = BeautifulSoup(self.html, 'lxml') - - def extract(self) -> Dict[str, Any]: - """Extract all content from the page.""" - self._parse_html() - - content = { - "title": self._get_title(), - "description": self._get_description(), - "author": self._get_author(), - "publish_date": self._get_publish_date(), - "content": self._get_content(), - "key_points": self._generate_key_points(), - "tags": self._get_tags(), - } - - return content - - def _get_title(self) -> str: - """Get page title.""" - # Try Open Graph title first - og_title = self.soup.find('meta', property='og:title') - if og_title and og_title.get('content'): - return og_title['content'].strip() - - # Try Twitter card title - twitter_title = self.soup.find('meta', attrs={'name': 'twitter:title'}) - if twitter_title and twitter_title.get('content'): - return twitter_title['content'].strip() - - # Try h1 tag - h1 = self.soup.find('h1') - if h1: - return h1.get_text().strip() - - # Fallback to tag - title_tag = self.soup.find('title') - if title_tag: - return title_tag.get_text().strip() - - return "Untitled Article" - - def _get_description(self) -> str: - """Get page description.""" - # Try Open Graph description - og_desc = self.soup.find('meta', property='og:description') - if og_desc and og_desc.get('content'): - return og_desc['content'].strip() - - # Try meta description - meta_desc = self.soup.find('meta', attrs={'name': 'description'}) - if meta_desc and meta_desc.get('content'): - return meta_desc['content'].strip() - - return "" - - def _get_author(self) -> str: - """Get article author.""" - # Try Open Graph author - og_author = self.soup.find('meta', property='article:author') - if og_author and og_author.get('content'): - return og_author['content'].strip() - - # Try meta author - meta_author = self.soup.find('meta', attrs={'name': 'author'}) - if meta_author and meta_author.get('content'): - return meta_author['content'].strip() - - # Try to find author in byline - byline = self.soup.find(class_=re.compile(r'byline|author', re.I)) - if byline: - return byline.get_text().strip() - - return "Unknown" - - def _get_publish_date(self) -> str: - """Get publish date.""" - # Try Open Graph publish time - og_time = self.soup.find('meta', property='article:published_time') - if og_time and og_time.get('content'): - return og_time['content'][:10] # YYYY-MM-DD - - # Try meta publish date - meta_time = self.soup.find('meta', attrs={'name': 'date'}) - if meta_time and meta_time.get('content'): - return meta_time['content'][:10] - - # Try time tag - time_tag = self.soup.find('time') - if time_tag and time_tag.get('datetime'): - return time_tag['datetime'][:10] - - return "Unknown" - - def _get_content(self) -> str: - """Extract main article content.""" - # Remove unwanted elements - for element in self.soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): - element.decompose() - - # Try to find main content area - content_areas = [ - self.soup.find('article'), - self.soup.find(class_=re.compile(r'article|content|post|entry', re.I)), - self.soup.find(id=re.compile(r'article|content|post', re.I)), - self.soup.find('main'), - ] - - content_elem = next((elem for elem in content_areas if elem), None) - - if content_elem: - # Get paragraphs from content area - paragraphs = content_elem.find_all('p') - else: - # Fallback to all paragraphs - paragraphs = self.soup.find_all('p') - - # Extract text from paragraphs - text_parts = [] - for p in paragraphs: - text = p.get_text().strip() - if len(text) > 50: # Filter out short paragraphs - text_parts.append(text) - - # Join and clean - content = "\n\n".join(text_parts) - content = re.sub(r'\n{3,}', '\n\n', content) # Remove excessive newlines - - return content[:10000] # Limit length - - def _generate_key_points(self) -> list: - """Generate key points from content.""" - content = self._get_content() - - if not content: - return [] - - # Extract first few sentences as key points - sentences = re.split(r'[.!?]+', content) - key_points = [] - - for sentence in sentences[:5]: - sentence = sentence.strip() - if len(sentence) > 30 and len(sentence) < 200: - key_points.append(sentence + '.') - - return key_points - - def _get_tags(self) -> list: - """Get article tags/categories.""" - tags = [] - - # Try Open Graph article tags - og_tags = self.soup.find_all('meta', property='article:tag') - for tag in og_tags: - if tag.get('content'): - tags.append(tag['content'].lower().replace(' ', '-')) - - # Try to find tag elements - tag_elements = self.soup.find_all(class_=re.compile(r'tag|category|label', re.I)) - for elem in tag_elements[:5]: # Limit to 5 - text = elem.get_text().strip().lower() - if len(text) < 30: - tags.append(text.replace(' ', '-')) - - # Add domain-based tag - domain = urlparse(self.url).netloc - if domain: - tags.append(domain.replace('www.', '').split('.')[0]) - - return list(set(tags))[:10] # Remove duplicates and limit diff --git a/extractors/instagram_extractor.py b/extractors/instagram_extractor.py deleted file mode 100644 index 883559a..0000000 --- a/extractors/instagram_extractor.py +++ /dev/null @@ -1,390 +0,0 @@ -""" -Instagram Reel Extractor - -Extracts: -- Title/caption -- Author/creator -- Description -- Transcript (if available via captions) -- Metadata (views, likes, etc.) - -Note: Instagram requires browser automation. Uses Playwright. -""" - -import html -import json -import re -import time -from typing import Dict, Any -from urllib.parse import urlparse - -try: - from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout -except ImportError: - sync_playwright = None - - -class InstagramExtractor: - """Extract content from Instagram reels.""" - - def __init__(self, url: str, headless: bool = True): - self.url = url - self.headless = headless - self.data = {} - - if sync_playwright is None: - raise ImportError("playwright not installed. Run: pip install playwright && playwright install") - - def extract(self) -> Dict[str, Any]: - """Extract content from Instagram reel.""" - try: - with sync_playwright() as p: - browser = p.chromium.launch(headless=self.headless) - page = browser.new_page( - user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - ) - - # Navigate to the reel - print(f"šŸ“± Loading Instagram reel...") - page.goto(self.url, timeout=30000) - - # Wait for content to load - time.sleep(3) - - # Try to close any cookies/login prompts - try: - page.click('button:has-text("Not Now")', timeout=3000) - except: - pass - - try: - page.click('button:has-text("Allow")', timeout=3000) - except: - pass - - # Extract data - self.data = self._extract_data(page) - - browser.close() - except PlaywrightTimeout: - print("āš ļø Timeout loading Instagram page") - self.data = self._fallback_extract() - except Exception as e: - print(f"āš ļø Error: {str(e)}") - self.data = self._fallback_extract() - - return self.data - - def _extract_data(self, page) -> Dict[str, Any]: - """Extract data from loaded page.""" - data = { - "title": "Instagram Reel", - "description": "", - "author": "Unknown", - "content": "", - "key_points": [], - "tags": ["instagram", "reel"], - } - - def _looks_like_language_list(text: str) -> bool: - lines = [line.strip() for line in text.splitlines() if line.strip()] - if len(lines) < 8: - return False - short_lines = [line for line in lines if len(line) <= 20] - if len(short_lines) / len(lines) < 0.8: - return False - single_tokenish = [line for line in short_lines if len(line.split()) <= 2] - return len(single_tokenish) / len(lines) > 0.7 - - def _looks_like_ui_prompt(text: str) -> bool: - lowered = text.lower() - blockers = [ - "allow the use of cookies", - "use of cookies", - "cookies and similar technologies", - "cookies policy", - "cookie preferences", - "learn more about cookies", - "review or change your choices", - "essential cookies", - "optional cookies", - "cookies from other companies", - "meta products", - "safer experience", - "information we receive from cookies", - "accept all", - "only allow essential", - "log in", - "login", - "sign up", - "sign in", - "save your login info", - "turn on notifications", - "not now", - ] - return any(blocker in lowered for blocker in blockers) - - # Try to get caption/description from meta and embedded JSON first - try: - meta_desc = page.query_selector('meta[property="og:description"], meta[name="description"]') - if meta_desc: - text = (meta_desc.get_attribute("content") or "").strip() - if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text): - data["description"] = text - - meta_title = page.query_selector('meta[property="og:title"], meta[name="twitter:title"]') - if meta_title and data["title"] == "Instagram Reel": - title_text = (meta_title.get_attribute("content") or "").strip() - if title_text: - data["title"] = title_text - - if not data["description"]: - html_source = page.content() - patterns = [ - r'<meta[^>]+property="og:description"[^>]+content="([^"]+)"', - r'<meta[^>]+name="description"[^>]+content="([^"]+)"', - r'<meta[^>]+name="twitter:description"[^>]+content="([^"]+)"', - ] - for pattern in patterns: - match = re.search(pattern, html_source, re.IGNORECASE) - if match: - text = html.unescape(match.group(1)).strip() - if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text): - data["description"] = text - break - - scripts = page.query_selector_all('script[type="application/ld+json"]') - for script in scripts: - raw = script.inner_text().strip() - if not raw: - continue - try: - payload = json.loads(raw) - except Exception: - continue - - def extract_from_obj(obj: Dict[str, Any]): - if not isinstance(obj, dict): - return - desc = obj.get("description") - if desc and not data["description"]: - if not _looks_like_ui_prompt(desc) and not _looks_like_language_list(desc): - data["description"] = desc.strip() - author = obj.get("author") - if author and data["author"] == "Unknown": - if isinstance(author, dict): - name = author.get("name") - if name: - data["author"] = name.strip() - elif isinstance(author, list): - for item in author: - if isinstance(item, dict) and item.get("name"): - data["author"] = item["name"].strip() - break - elif isinstance(author, str): - data["author"] = author.strip() - - if isinstance(payload, list): - for obj in payload: - extract_from_obj(obj) - else: - extract_from_obj(payload) - - if data["description"] and data["author"] != "Unknown": - break - except Exception as e: - print(f"āš ļø Could not extract meta/ld+json: {e}") - - # Try to get caption/description from embedded shared data - try: - html = page.content() - payloads = [] - shared_match = re.search(r'window\._sharedData\s*=\s*({.*?});</script>', html, re.DOTALL) - if shared_match: - payloads.append(shared_match.group(1)) - for match in re.finditer(r'__additionalDataLoaded\([^,]+,\s*({.*?})\);', html, re.DOTALL): - payloads.append(match.group(1)) - - def extract_from_media(media: Dict[str, Any]): - if not isinstance(media, dict): - return - if data["author"] == "Unknown": - owner = media.get("owner") or {} - if isinstance(owner, dict): - name = owner.get("username") or owner.get("full_name") - if name: - data["author"] = name.strip() - - caption_text = None - edge = media.get("edge_media_to_caption") - if isinstance(edge, dict): - edges = edge.get("edges") or [] - if edges: - node = edges[0].get("node", {}) - if isinstance(node, dict): - caption_text = node.get("text") - - if not caption_text and isinstance(media.get("caption"), dict): - caption_text = media["caption"].get("text") - - if caption_text and not data["description"]: - if not _looks_like_ui_prompt(caption_text) and not _looks_like_language_list(caption_text): - data["description"] = caption_text.strip() - - def walk(obj: Any): - if isinstance(obj, dict): - graphql = obj.get("graphql") - if isinstance(graphql, dict): - extract_from_media(graphql.get("shortcode_media") or graphql.get("media")) - if isinstance(obj.get("shortcode_media"), dict): - extract_from_media(obj.get("shortcode_media")) - for v in obj.values(): - walk(v) - elif isinstance(obj, list): - for item in obj: - walk(item) - - for raw in payloads: - try: - parsed = json.loads(raw) - except Exception: - continue - walk(parsed) - if data["description"] and data["author"] != "Unknown": - break - except Exception as e: - print(f"āš ļø Could not extract shared data: {e}") - - # Try to get caption/description from visible text - try: - # Look for caption text - captions = page.query_selector_all('h1, h2, span') - for caption in captions: - text = caption.inner_text().strip() - if ( - len(text) > 20 - and len(text) < 500 - and not _looks_like_language_list(text) - and not _looks_like_ui_prompt(text) - ): - if not data["description"]: - data["description"] = text - break - except Exception as e: - print(f"āš ļø Could not extract caption: {e}") - - # Try to get author - try: - author_elem = page.query_selector('a[href*="/"] h1, a[href*="/"] h2, header span') - if author_elem: - data["author"] = author_elem.inner_text().strip() - except: - pass - - # Try to get engagement metrics - try: - likes_elem = page.query_selector('span:has-text("likes"), span:has-text("views")') - if likes_elem: - data["views"] = likes_elem.inner_text().strip() - except: - pass - - # Extract any visible text as content - try: - if data["description"] and not _looks_like_ui_prompt(data["description"]): - data["content"] = data["description"].strip() - else: - # Get all text content - body_text = page.inner_text('body') - - # Filter for meaningful content - lines = body_text.split('\n') - cleaned_lines = [] - buffer = [] - - def flush_buffer(): - if buffer: - block = "\n".join(buffer) - if not _looks_like_language_list(block): - cleaned_lines.extend( - [line for line in buffer if not _looks_like_ui_prompt(line)] - ) - buffer.clear() - - for line in lines: - stripped = line.strip() - if not stripped: - flush_buffer() - continue - if _looks_like_ui_prompt(stripped): - continue - if len(stripped) <= 24: - buffer.append(stripped) - else: - flush_buffer() - cleaned_lines.append(stripped) - - flush_buffer() - - meaningful_lines = [ - line for line in cleaned_lines - if len(line) > 30 and len(line) < 300 - ] - - data["content"] = "\n\n".join(meaningful_lines[:10])[:5000] - except Exception as e: - print(f"āš ļø Could not extract page text: {e}") - - # Generate key points from description or content - base_text = "" - if data["description"] and not _looks_like_ui_prompt(data["description"]): - base_text = data["description"] - elif data["content"]: - base_text = data["content"] - - if base_text: - sentences = re.split(r'(?<=[.!?])\s+', base_text.strip()) - data["key_points"] = [ - s.strip() for s in sentences - if 20 < len(s.strip()) < 200 - ][:3] - - # Add URL-based tags - parsed = urlparse(self.url) - if '/reel/' in parsed.path: - data["tags"].append("reel") - if '/video/' in parsed.path: - data["tags"].append("video") - - return data - - def _fallback_extract(self) -> Dict[str, Any]: - """Fallback extraction when browser automation fails.""" - print("āš ļø Using fallback extraction method...") - - # Try to extract what we can from the URL itself - data = { - "title": "Instagram Content", - "description": "[Could not extract - Instagram requires login]", - "author": "Unknown", - "content": "", - "key_points": [ - "Instagram content extraction requires browser automation", - "Consider using Instagram's official API or downloading the video manually", - ], - "tags": ["instagram", "fallback"], - } - - # Extract reel ID from URL - try: - parsed = urlparse(self.url) - path_parts = parsed.path.split('/') - for i, part in enumerate(path_parts): - if part in ['reel', 'p', 'tv'] and i + 1 < len(path_parts): - reel_id = path_parts[i + 1] - data["key_points"].append(f"Reel ID: {reel_id}") - break - except: - pass - - return data diff --git a/extractors/youtube_extractor.py b/extractors/youtube_extractor.py deleted file mode 100644 index be0a210..0000000 --- a/extractors/youtube_extractor.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -YouTube Video Extractor - -Extracts: -- Title, description, author -- Transcript/captions -- Duration, views, publish date -- Tags/categories -""" - -import re -from typing import Optional, Dict, Any -from urllib.parse import urlparse, parse_qs - -try: - from youtube_transcript_api import YouTubeTranscriptApi - from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound -except ImportError: - YouTubeTranscriptApi = None - -try: - from pytubefix import YouTube # More reliable than pytube -except ImportError: - try: - from pytube import YouTube - except ImportError: - YouTube = None - - -class YouTubeExtractor: - """Extract content from YouTube videos.""" - - def __init__(self, url: str): - self.url = url - self.video_id = self._extract_video_id(url) - self.youtube = None - - def _extract_video_id(self, url: str) -> str: - """Extract video ID from YouTube URL.""" - patterns = [ - r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})', - r'youtube\.com\/embed\/([a-zA-Z0-9_-]{11})', - r'youtube\.com\/v\/([a-zA-Z0-9_-]{11})', - ] - - for pattern in patterns: - match = re.search(pattern, url) - if match: - return match.group(1) - - raise ValueError(f"Could not extract YouTube video ID from: {url}") - - def _init_youtube(self): - """Initialize YouTube object.""" - if YouTube is None: - raise ImportError("pytube or pytubefix not installed. Run: pip install pytubefix") - - if self.youtube is None: - self.youtube = YouTube(self.url) - - def extract(self) -> Dict[str, Any]: - """Extract all content from YouTube video.""" - self._init_youtube() - - content = { - "title": self._get_title(), - "description": self._get_description(), - "author": self._get_author(), - "duration": self._get_duration(), - "publish_date": self._get_publish_date(), - "views": self._get_views(), - "content": self._get_transcript(), - "key_points": self._generate_key_points(), - "tags": self._get_tags(), - } - - return content - - def _get_title(self) -> str: - """Get video title.""" - try: - self._init_youtube() - return self.youtube.title - except Exception as e: - return f"Video {self.video_id}" - - def _get_description(self) -> str: - """Get video description.""" - try: - self._init_youtube() - return self.youtube.description or "" - except Exception: - return "" - - def _get_author(self) -> str: - """Get video author/channel name.""" - try: - self._init_youtube() - return self.youtube.author - except Exception: - return "Unknown" - - def _get_duration(self) -> str: - """Get video duration in readable format.""" - try: - self._init_youtube() - seconds = self.youtube.length - minutes, secs = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - - if hours > 0: - return f"{hours}:{minutes:02d}:{secs:02d}" - else: - return f"{minutes}:{secs:02d}" - except Exception: - return "Unknown" - - def _get_publish_date(self) -> str: - """Get video publish date.""" - try: - self._init_youtube() - if hasattr(self.youtube, 'publish_date') and self.youtube.publish_date: - return self.youtube.publish_date.strftime("%Y-%m-%d") - except Exception: - pass - return "Unknown" - - def _get_views(self) -> str: - """Get view count.""" - try: - self._init_youtube() - views = self.youtube.views - if views > 1_000_000: - return f"{views / 1_000_000:.1f}M" - elif views > 1_000: - return f"{views / 1_000:.1f}K" - else: - return str(views) - except Exception: - return "Unknown" - - def _get_transcript(self) -> str: - """Get video transcript/captions.""" - if YouTubeTranscriptApi is None: - return "[Transcript not available - youtube-transcript-api not installed]" - - try: - # New API requires creating an instance - api = YouTubeTranscriptApi() - transcript_list = api.list(self.video_id) - - # Try to find English transcript - transcript = None - for t in transcript_list: - if t.language_code == 'en': - transcript = t - break - - # Fallback to first available - if transcript is None: - transcript = next(iter(transcript_list), None) - - if transcript is None: - return "[No transcript available]" - - transcript_data = transcript.fetch() - - # New API returns FetchedTranscript with snippets - if hasattr(transcript_data, 'snippets'): - full_text = " ".join([snippet.text for snippet in transcript_data.snippets]) - else: - # Fallback for older API format - full_text = " ".join([entry['text'] for entry in transcript_data]) - - # Clean up the text - full_text = full_text.replace("\n", " ").strip() - - return full_text[:10000] # Limit length - except Exception as e: - return f"[Transcript not available: {str(e)}]" - - def _generate_key_points(self) -> list: - """Generate key points from transcript (simple extraction).""" - transcript = self._get_transcript() - - if not transcript or transcript.startswith("["): - return [] - - # Simple sentence extraction (first few sentences as key points) - sentences = transcript.split('.')[:5] - key_points = [s.strip() + '.' for s in sentences if len(s.strip()) > 20] - - return key_points[:5] - - def _get_tags(self) -> list: - """Get video tags.""" - try: - self._init_youtube() - if hasattr(self.youtube, 'keywords'): - return self.youtube.keywords[:10] if self.youtube.keywords else [] - except Exception: - pass - return ["youtube", "video"] diff --git a/flake.nix b/flake.nix index a757a57..d1a7f06 100644 --- a/flake.nix +++ b/flake.nix @@ -1,5 +1,5 @@ { - description = "Development environment for jbackup"; + description = "Backblaze Invoice Downloader"; inputs = { nixpkgs.url = "nixpkgs/nixos-unstable"; @@ -18,16 +18,8 @@ devShell = pkgs.mkShell { packages = with pkgs; [ (python3.withPackages (ps: [ - ps.requests - ps.beautifulsoup4 - ps.lxml - ps."youtube-transcript-api" - ps.pytube ps.playwright - ps.markdown ps."python-dotenv" - ps.pydantic - ps."python-dateutil" ])) playwright-driver.browsers ]; diff --git a/main.py b/main.py index 3c60f59..51c6d01 100644 --- a/main.py +++ b/main.py @@ -1,231 +1,53 @@ #!/usr/bin/env python3 -""" -Content Extractor - Extract key information from URLs and save to Obsidian - -Supports: -- YouTube videos (transcripts, descriptions, metadata) -- Blog posts & articles (web scraping) -- Instagram reels (via browser automation) -- Generic URLs (text extraction) - -Usage: - python main.py <url> [--obsidian-path <path>] [--output <filename>] -""" - import argparse -import sys import logging -from pathlib import Path -from datetime import datetime -from typing import Optional +import sys -from extractors.youtube_extractor import YouTubeExtractor -from extractors.blog_extractor import BlogExtractor -from extractors.instagram_extractor import InstagramExtractor -from obsidian_writer import ObsidianWriter from config import Config -from summarizer import summarize_text, SummarizationError, format_markdown_content - - -def detect_source_type(url: str) -> str: - """Detect the type of content based on URL.""" - if "youtube.com" in url or "youtu.be" in url: - return "youtube" - elif "instagram.com" in url and "/reel" in url: - return "instagram" - elif "instagram.com" in url: - return "instagram" - else: - return "blog" - - -def extract_content(url: str, source_type: str) -> dict: - """Extract content from URL based on source type.""" - print(f"šŸ” Extracting content from {source_type}...") - - if source_type == "youtube": - extractor = YouTubeExtractor(url) - elif source_type == "instagram": - extractor = InstagramExtractor(url) - else: - extractor = BlogExtractor(url) - - return extractor.extract() +from downloader import download_all_invoices def main(): - logging.basicConfig( - level=getattr(logging, Config.LOG_LEVEL.upper(), logging.INFO), - format="%(asctime)s %(levelname)s [%(name)s] %(message)s", - handlers=[ - logging.StreamHandler(), - logging.FileHandler(Config.LOG_FILE), - ], - ) - - parser = argparse.ArgumentParser( - description="Extract content from URLs and save to Obsidian notes" - ) - parser.add_argument("url", help="URL to extract content from") - parser.add_argument( - "--obsidian-path", - type=str, - default=Config.OBSIDIAN_VAULT_PATH, - help="Path to Obsidian vault" - ) - parser.add_argument( - "--output", - type=str, - default=None, - help="Output filename (without .md extension)" - ) - parser.add_argument( - "--folder", - type=str, - default="Content Extractor", - help="Folder in Obsidian vault to save notes" - ) - parser.add_argument( - "--no-save", - action="store_true", - help="Only print extracted content, don't save to Obsidian" - ) - parser.add_argument( - "--summarize", - action="store_true", - help="Generate a summary of the content" - ) - + parser = argparse.ArgumentParser(description="Download Backblaze invoices as PDF") + parser.add_argument("--output", "-o", help="Output directory", default=None) + parser.add_argument("--headless", action="store_true", default=None, help="Run browser headless") + parser.add_argument("--no-headless", action="store_true", default=False, help="Show browser window") + parser.add_argument("--vat-id", help="VAT ID to fill on invoices") + parser.add_argument("--document-type", help="Document type to select") + parser.add_argument("--company", help="Company name to fill") + parser.add_argument("--notes", help="Notes to fill on invoices") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") args = parser.parse_args() - # Detect source type - source_type = detect_source_type(args.url) - print(f"šŸ“Œ Detected source type: {source_type}") + logging.basicConfig( + level=logging.DEBUG if args.verbose else getattr(logging, Config.LOG_LEVEL), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) - # Extract content - try: - content = extract_content(args.url, source_type) - except Exception as e: - print(f"āŒ Extraction failed: {e}") - sys.exit(1) - - if not content: - print("āŒ No content could be extracted") - sys.exit(1) - - if content.get("content"): - try: - content["content"] = format_markdown_content(content["content"]) - except SummarizationError as e: - print(f"āš ļø Content formatting failed: {e}") - - # Generate AI summary + key points - if args.summarize or Config.GENERATE_SUMMARY: - source_text = "\n\n".join( - part for part in [content.get("description", ""), content.get("content", "")] - if part - ).strip() - if source_text: - try: - summary_result = summarize_text(source_text, max_points=3) - if summary_result.get("summary"): - content["description"] = summary_result["summary"] - if summary_result.get("key_points"): - content["key_points"] = summary_result["key_points"] - except SummarizationError as e: - print(f"āš ļø Summarization failed: {e}") - - # Generate output filename if args.output: - filename = args.output - else: - # Generate from title or URL - title = content.get("title", "Untitled") - filename = f"{title[:50]}_{datetime.now().strftime('%Y%m%d')}" - # Sanitize filename - filename = "".join(c for c in filename if c.isalnum() or c in " -_").strip() + Config.OUTPUT_DIR = args.output + if args.no_headless: + Config.BROWSER_HEADLESS = False + elif args.headless is True: + Config.BROWSER_HEADLESS = True + if args.vat_id: + Config.INVOICE_VAT_ID = args.vat_id + if args.document_type: + Config.INVOICE_DOCUMENT_TYPE = args.document_type + if args.company: + Config.INVOICE_COMPANY = args.company + if args.notes: + Config.INVOICE_NOTES = args.notes - # Create markdown content - markdown = generate_markdown(content, source_type, args.url) + try: + Config.validate() + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) - # Print preview - print("\n" + "="*80) - print("šŸ“ EXTRACTED CONTENT PREVIEW") - print("="*80) - print(markdown[:2000] + "..." if len(markdown) > 2000 else markdown) - print("="*80) - - # Save to Obsidian - if not args.no_save: - writer = ObsidianWriter(args.obsidian_path) - output_path = writer.save_note(markdown, filename, args.folder) - print(f"\nāœ… Note saved to: {output_path}") - else: - print("\nāš ļø Note not saved (--no-save flag)") - - return content - - -def generate_markdown(content: dict, source_type: str, url: str) -> str: - """Generate markdown content for Obsidian note.""" - lines = [] - - # Header - lines.append(f"# {content.get('title', 'Untitled')}") - lines.append("") - - # Metadata - lines.append("## Metadata") - lines.append("") - lines.append(f"- **Source**: {source_type.capitalize()}") - lines.append(f"- **URL**: {url}") - lines.append(f"- **Extracted**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - - if content.get("author"): - lines.append(f"- **Author**: {content.get('author')}") - if content.get("duration"): - lines.append(f"- **Duration**: {content.get('duration')}") - if content.get("publish_date"): - lines.append(f"- **Published**: {content.get('publish_date')}") - if content.get("views"): - lines.append(f"- **Views**: {content.get('views')}") - - lines.append("") - - # Description/Summary - if content.get("description"): - lines.append("## Description") - lines.append("") - lines.append(content.get("description", "")) - lines.append("") - - # Main Content (transcript, article text, etc.) - if content.get("content"): - lines.append("## Content") - lines.append("") - lines.append(content.get("content", "")) - lines.append("") - - # Key Points/Summary - if content.get("key_points"): - lines.append("## Key Points") - lines.append("") - for point in content.get("key_points", []): - lines.append(f"- {point}") - lines.append("") - - # Tags - lines.append("---") - lines.append("") - lines.append("## Tags") - lines.append("") - tags = content.get("tags", []) - if not tags: - tags = ["content-extractor", source_type, "notes"] - lines.append(" ".join(f"#{tag}" for tag in tags)) - lines.append("") - - return "\n".join(lines) + saved = download_all_invoices() + print(f"\nDone. {len(saved)} invoice(s) saved to {Config.OUTPUT_DIR}") if __name__ == "__main__": diff --git a/obsidian_writer.py b/obsidian_writer.py deleted file mode 100644 index e073a4c..0000000 --- a/obsidian_writer.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Obsidian Note Writer - -Saves extracted content as markdown notes in Obsidian vault. -""" - -import os -from pathlib import Path -from datetime import datetime -from typing import Optional - - -class ObsidianWriter: - """Write content to Obsidian vault as markdown notes.""" - - def __init__(self, vault_path: str): - self.vault_path = Path(vault_path).expanduser() - self._validate_vault() - - def _validate_vault(self): - """Validate that the path is an Obsidian vault.""" - if not self.vault_path.exists(): - print(f"āš ļø Creating Obsidian vault directory: {self.vault_path}") - self.vault_path.mkdir(parents=True, exist_ok=True) - - # Check if it looks like an Obsidian vault - obsidian_config = self.vault_path / ".obsidian" - if not obsidian_config.exists(): - print(f"āš ļø Warning: {self.vault_path} doesn't look like an Obsidian vault") - print(" (No .obsidian directory found)") - print(" Notes will still be saved, but you may want to set the correct vault path") - - def save_note( - self, - content: str, - filename: str, - folder: Optional[str] = None, - subfolder: Optional[str] = None - ) -> Path: - """ - Save a note to Obsidian vault. - - Args: - content: Markdown content to save - filename: Filename without .md extension - folder: Folder in vault (default: root) - subfolder: Subfolder within folder (optional) - - Returns: - Path to saved file - """ - # Build path - if folder: - note_dir = self.vault_path / folder - if subfolder: - note_dir = note_dir / subfolder - else: - note_dir = self.vault_path - - # Create directory if it doesn't exist - note_dir.mkdir(parents=True, exist_ok=True) - - # Sanitize filename - filename = self._sanitize_filename(filename) - - # Add .md extension - filepath = note_dir / f"{filename}.md" - - # Handle duplicate filenames - counter = 1 - original_filepath = filepath - while filepath.exists(): - filepath = original_filepath.with_name(f"{filename}_{counter}.md") - counter += 1 - - # Write the file - try: - with open(filepath, 'w', encoding='utf-8') as f: - f.write(content) - print(f"āœ… Note saved: {filepath.name}") - return filepath - except Exception as e: - raise Exception(f"Failed to save note: {str(e)}") - - def _sanitize_filename(self, filename: str) -> str: - """Sanitize filename for filesystem.""" - # Remove invalid characters - invalid_chars = '<>:"/\\|?*' - for char in invalid_chars: - filename = filename.replace(char, '') - - # Replace spaces with hyphens (optional, but cleaner) - # filename = filename.replace(' ', '-') - - # Limit length - if len(filename) > 100: - filename = filename[:100] - - return filename.strip() - - def create_daily_note(self, content: str) -> Path: - """Create/update a daily note.""" - today = datetime.now().strftime("%Y-%m-%d") - folder = "Daily Notes" - return self.save_note(content, today, folder) - - def append_to_note(self, filename: str, content: str, folder: Optional[str] = None) -> Path: - """Append content to an existing note.""" - if folder: - note_dir = self.vault_path / folder - else: - note_dir = self.vault_path - - filepath = note_dir / f"{filename}.md" - - # If file doesn't exist, create it - if not filepath.exists(): - return self.save_note(content, filename, folder) - - # Append to existing file - try: - with open(filepath, 'a', encoding='utf-8') as f: - f.write("\n\n---\n\n") - f.write(content) - print(f"āœ… Content appended to: {filepath.name}") - return filepath - except Exception as e: - raise Exception(f"Failed to append to note: {str(e)}") diff --git a/requirements.txt b/requirements.txt index 5dc463e..0d2473f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,23 +1,2 @@ -# Content Extractor Dependencies - -# Web scraping -requests>=2.31.0 -beautifulsoup4>=4.12.0 -lxml>=4.9.0 - -# YouTube -youtube-transcript-api>=0.6.0 -pytube>=15.0.0 - -# Browser automation (for Instagram and dynamic content) playwright>=1.40.0 - -# Text processing -markdown>=3.5.0 - -# Utilities python-dotenv>=1.0.0 -pydantic>=2.5.0 - -# Date handling -python-dateutil>=2.8.0 diff --git a/summarizer.py b/summarizer.py deleted file mode 100644 index d9e212b..0000000 --- a/summarizer.py +++ /dev/null @@ -1,172 +0,0 @@ -""" -OpenAI/OpenRouter summarizer utility. - -Uses OPENAI_API_KEY and OPENAI_URL from environment (via Config). -""" - -from __future__ import annotations - -import json -import logging -from typing import Dict, List - -import requests - -from config import Config - - -class SummarizationError(RuntimeError): - """Raised when summarization fails.""" - - -logger = logging.getLogger(__name__) - - -def summarize_text(text: str, max_points: int = 3) -> Dict[str, List[str] | str]: - """ - Summarize text into a short summary and key points. - - Returns: - { - "summary": "string", - "key_points": ["point 1", "point 2", ...] - } - """ - if not text or not text.strip(): - return {"summary": "", "key_points": []} - - if not Config.OPENAI_API_KEY: - raise SummarizationError("OPENAI_API_KEY is not set") - - payload = { - "model": Config.OPENAI_MODEL, - "messages": [ - { - "role": "system", - "content": ( - "You are a precise summarizer. Return JSON only with keys " - "`summary` and `key_points` (array of strings). Do not add extra keys." - ), - }, - { - "role": "user", - "content": ( - "Summarize the following content in 2-4 sentences and provide " - f"{max_points} key points.\n\n" - f"CONTENT:\n{text}" - ), - }, - ], - "temperature": 0.2, - "max_tokens": 400, - } - - headers = { - "Authorization": f"Bearer {Config.OPENAI_API_KEY}", - "Content-Type": "application/json", - } - - try: - logger.info( - "OpenAI request: url=%s model=%s timeout=%ss input_chars=%s", - Config.OPENAI_URL, - Config.OPENAI_MODEL, - Config.OPENAI_TIMEOUT, - len(text), - ) - if Config.OPENAI_LOG_PAYLOAD: - logger.debug("OpenAI request payload: %s", json.dumps(payload, ensure_ascii=False)) - response = requests.post( - Config.OPENAI_URL, - headers=headers, - json=payload, - timeout=Config.OPENAI_TIMEOUT, - ) - logger.info("OpenAI response: status=%s", response.status_code) - if Config.OPENAI_LOG_PAYLOAD: - logger.debug("OpenAI response body: %s", response.text) - response.raise_for_status() - data = response.json() - except Exception as exc: - logger.exception("OpenAI request failed") - raise SummarizationError(f"Request failed: {exc}") from exc - - try: - content = data["choices"][0]["message"]["content"].strip() - result = json.loads(content) - summary = result.get("summary", "").strip() - key_points = [p.strip() for p in result.get("key_points", []) if p.strip()] - return {"summary": summary, "key_points": key_points} - except Exception as exc: - raise SummarizationError(f"Invalid response format: {exc}") from exc - - -def format_markdown_content(text: str) -> str: - """ - Clean and format social content into sensible markdown. - - - Remove excessive emojis/icons - - Convert list-like lines into ordered/bulleted lists - - Remove obvious ads/sponsor lines - - Normalize whitespace - """ - if not text or not text.strip(): - return "" - - if not Config.OPENAI_API_KEY: - raise SummarizationError("OPENAI_API_KEY is not set") - - payload = { - "model": Config.OPENAI_MODEL, - "messages": [ - { - "role": "system", - "content": ( - "You are a precise formatter. Return only cleaned markdown text. " - "Remove ads/sponsor lines, collapse excessive whitespace, " - "and replace emoji-heavy bullets with normal bullet/numbered lists. " - "Do not add a title or extra sections." - ), - }, - { - "role": "user", - "content": ( - "Format the following content:\n\n" - f"{text}" - ), - }, - ], - "temperature": 0.1, - "max_tokens": 800, - } - - headers = { - "Authorization": f"Bearer {Config.OPENAI_API_KEY}", - "Content-Type": "application/json", - } - - try: - logger.info( - "OpenAI format request: url=%s model=%s timeout=%ss input_chars=%s", - Config.OPENAI_URL, - Config.OPENAI_MODEL, - Config.OPENAI_TIMEOUT, - len(text), - ) - if Config.OPENAI_LOG_PAYLOAD: - logger.debug("OpenAI format request payload: %s", json.dumps(payload, ensure_ascii=False)) - response = requests.post( - Config.OPENAI_URL, - headers=headers, - json=payload, - timeout=Config.OPENAI_TIMEOUT, - ) - logger.info("OpenAI format response: status=%s", response.status_code) - if Config.OPENAI_LOG_PAYLOAD: - logger.debug("OpenAI format response body: %s", response.text) - response.raise_for_status() - data = response.json() - return data["choices"][0]["message"]["content"].strip() - except Exception as exc: - logger.exception("OpenAI format request failed") - raise SummarizationError(f"Formatting request failed: {exc}") from exc