convert to backblaze fetcher

This commit is contained in:
Jan Bader
2026-04-05 22:01:46 +02:00
parent 66e1c9e0e0
commit a9bb2460c6
15 changed files with 333 additions and 1620 deletions

View File

@@ -1,21 +1,19 @@
# Content Extractor Configuration # Backblaze credentials
BACKBLAZE_EMAIL=you@example.com
BACKBLAZE_PASSWORD=your_password
# Obsidian vault path (default: ~/Obsidian Vault) # Invoice fields (all optional - leave empty to skip)
OBSIDIAN_VAULT_PATH=~/Obsidian Vault INVOICE_VAT_ID=
INVOICE_DOCUMENT_TYPE=
INVOICE_COMPANY=
INVOICE_NOTES=
# Browser settings (for Instagram extraction) # Output
OUTPUT_DIR=./invoices
# Browser
BROWSER_HEADLESS=true BROWSER_HEADLESS=true
BROWSER_TIMEOUT=30000 BROWSER_TIMEOUT=30000
# Content extraction settings
MAX_CONTENT_LENGTH=10000
GENERATE_SUMMARY=true
# YouTube settings
YOUTUBE_LANGUAGE=en
# Instagram settings
INSTAGRAM_WAIT_TIME=5
# Logging # Logging
LOG_LEVEL=INFO LOG_LEVEL=INFO

3
.gitignore vendored
View File

@@ -51,3 +51,6 @@ htmlcov/
# Playwright # Playwright
.playwright/ .playwright/
.direnv/ .direnv/
# Output
invoices/

202
README.md
View File

@@ -1,198 +1,64 @@
# Content Extractor 🔥 # Backblaze Invoice Downloader
Extract key information from URLs (YouTube, Instagram, blogs) and save to Obsidian notes automatically. Download Backblaze invoices as PDF using browser automation.
## Features Backblaze only provides invoices via a web page that must be printed — this tool automates that process using Playwright, filling in configurable fields (VAT ID, document type, company, notes) and exporting each invoice to PDF.
- **YouTube Videos**: Extract title, description, transcript, author, duration, views ## Setup
- **Instagram Reels**: Extract caption, author, engagement metrics (via browser automation)
- **Blog Posts/Articles**: Extract title, author, content, tags, publish date
- **Auto-save to Obsidian**: Notes are automatically formatted and saved to your Obsidian vault
- **Smart Summaries**: Generates key points from extracted content
## Installation
```bash ```bash
# Navigate to the content-extractor directory
cd ~/Desktop/itsthatnewshit/content-extractor
# Install dependencies
pip install -r requirements.txt pip install -r requirements.txt
playwright install chromium
# Install Playwright browsers (for Instagram extraction)
playwright install
``` ```
## Usage Or with Nix:
### Basic Usage
```bash ```bash
# Extract from YouTube video nix develop
python main.py "https://www.youtube.com/watch?v=VIDEO_ID"
# Extract from Instagram reel
python main.py "https://www.instagram.com/reel/REEL_ID"
# Extract from blog post
python main.py "https://example.com/article"
```
### Advanced Options
```bash
# Specify Obsidian vault path
python main.py <url> --obsidian-path "/path/to/Obsidian Vault"
# Custom output filename
python main.py <url> --output "my-note-title"
# Save to specific folder in Obsidian
python main.py <url> --folder "Learning/YouTube"
# Only print content, don't save to Obsidian
python main.py <url> --no-save
# Generate summary
python main.py <url> --summarize
```
### Examples
```bash
# Save YouTube tutorial to Learning folder
python main.py "https://youtu.be/abc123" --folder "Learning" --output "Python Tutorial"
# Extract Instagram reel without saving
python main.py "https://instagram.com/reel/xyz789" --no-save
# Extract blog post to default vault
python main.py "https://medium.com/article" --folder "Articles"
``` ```
## Configuration ## Configuration
Create a `.env` file in the project directory to customize settings: Create a `.env` file (see `.env.example`):
```bash
cp .env.example .env
```
Edit `.env` with your preferences:
```env ```env
# Obsidian vault path BACKBLAZE_EMAIL=you@example.com
OBSIDIAN_VAULT_PATH=~/Obsidian Vault BACKBLAZE_PASSWORD=your_password
# Browser settings (for Instagram) INVOICE_VAT_ID=DE123456789
INVOICE_DOCUMENT_TYPE=Invoice
INVOICE_COMPANY=My Company GmbH
INVOICE_NOTES=Internal ref: 12345
OUTPUT_DIR=./invoices
BROWSER_HEADLESS=true BROWSER_HEADLESS=true
BROWSER_TIMEOUT=30000
# Content extraction
MAX_CONTENT_LENGTH=10000
GENERATE_SUMMARY=true
# OpenAI/OpenRouter
OPENAI_API_KEY=your_key_here
OPENAI_URL=https://openrouter.ai/api/v1/chat/completions
OPENAI_MODEL=gpt-4o-mini
OPENAI_TIMEOUT=30
# YouTube
YOUTUBE_LANGUAGE=en
# Instagram
INSTAGRAM_WAIT_TIME=5
``` ```
## Output Format ## Usage
Notes are saved in markdown format with:
- Title and metadata (source, URL, extraction date)
- Author, duration, views (when available)
- Description/summary
- Full content (transcript or article text)
- Key points
- Tags for easy organization
Example output:
```markdown
# How to Build AI Agents
## Metadata
- **Source**: Youtube
- **URL**: https://youtube.com/watch?v=abc123
- **Extracted**: 2026-02-21 15:30:00
- **Author**: Tech Channel
- **Duration**: 12:34
- **Views**: 1.2M
## Description
Learn how to build AI agents from scratch...
## Content
[Full transcript or article text...]
## Key Points
- Point 1 from the content
- Point 2 from the content
- Point 3 from the content
---
## Tags
#youtube #video #ai #agents #notes
```
## Troubleshooting
### Instagram extraction fails
Instagram requires browser automation. Make sure you've run:
```bash ```bash
playwright install python main.py
``` ```
If it still fails, Instagram may have changed their UI. The extractor has a fallback mode that will still extract basic info. ### Options
### YouTube transcript not available
Some videos don't have captions/transcripts. The extractor will fall back to extracting the description only.
### Obsidian vault not found
By default, the tool looks for `~/Obsidian Vault`. If your vault is elsewhere, use the `--obsidian-path` flag or set `OBSIDIAN_VAULT_PATH` in your `.env` file.
## Project Structure
``` ```
content-extractor/ -o, --output DIR Output directory (default: ./invoices)
├── main.py # Main entry point --headless Run browser headless
├── config.py # Configuration settings --no-headless Show browser window (useful for debugging)
├── obsidian_writer.py # Obsidian note writer --vat-id ID VAT ID to fill on invoices
├── requirements.txt # Python dependencies --document-type TYPE Document type to select
├── .env.example # Example environment file --company NAME Company name to fill
├── README.md # This file --notes TEXT Notes to fill on invoices
└── extractors/ -v, --verbose Verbose logging
├── __init__.py
├── youtube_extractor.py # YouTube extraction
├── instagram_extractor.py # Instagram extraction
└── blog_extractor.py # Blog/article extraction
``` ```
## Future Enhancements CLI arguments override `.env` values.
- [ ] AI-powered summarization (using LLMs) ## How it works
- [ ] Podcast/audio extraction (whisper transcription)
- [ ] Twitter/X thread extraction
- [ ] LinkedIn post extraction
- [ ] Batch processing (extract from multiple URLs)
- [ ] Web interface
- [ ] Automatic tagging based on content
## License 1. Logs in to `secure.backblaze.com`
2. Navigates to the billing page
MIT License - Feel free to use and modify! 3. Iterates over all billing groups and years
4. For each invoice, opens the invoice page, fills the configured fields, and exports to PDF
--- 5. Skips already-downloaded invoices
Built with 🔥 by RUBIUS for naki

View File

@@ -1,54 +1,26 @@
"""
Configuration for Content Extractor
"""
import os import os
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
# Load environment variables
load_dotenv() load_dotenv()
class Config: class Config:
"""Configuration settings for content extractor.""" BACKBLAZE_EMAIL = os.getenv("BACKBLAZE_EMAIL", "")
BACKBLAZE_PASSWORD = os.getenv("BACKBLAZE_PASSWORD", "")
# Obsidian vault path (default to common locations) INVOICE_VAT_ID = os.getenv("INVOICE_VAT_ID", "")
OBSIDIAN_VAULT_PATH = os.getenv( INVOICE_DOCUMENT_TYPE = os.getenv("INVOICE_DOCUMENT_TYPE", "")
"OBSIDIAN_VAULT_PATH", INVOICE_COMPANY = os.getenv("INVOICE_COMPANY", "")
os.path.expanduser("~/Obsidian Vault") # Default location INVOICE_NOTES = os.getenv("INVOICE_NOTES", "")
)
# Browser settings (for Instagram and dynamic content) OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./invoices")
BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true" BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true"
BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "30000")) # 30 seconds BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "30000"))
# Content extraction settings
MAX_CONTENT_LENGTH = int(os.getenv("MAX_CONTENT_LENGTH", "10000")) # Max chars
GENERATE_SUMMARY = os.getenv("GENERATE_SUMMARY", "true").lower() == "true"
# OpenAI/OpenRouter settings
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_URL = os.getenv("OPENAI_URL", "https://api.openai.com/v1/chat/completions")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
OPENAI_TIMEOUT = int(os.getenv("OPENAI_TIMEOUT", "30"))
OPENAI_LOG_PAYLOAD = os.getenv("OPENAI_LOG_PAYLOAD", "false").lower() == "true"
# YouTube settings
YOUTUBE_LANGUAGE = os.getenv("YOUTUBE_LANGUAGE", "en")
# Instagram settings (requires browser automation)
INSTAGRAM_WAIT_TIME = int(os.getenv("INSTAGRAM_WAIT_TIME", "5")) # seconds
# Logging
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
LOG_FILE = os.getenv("LOG_FILE", "content_extractor.log")
@classmethod @classmethod
def validate(cls): def validate(cls):
"""Validate configuration.""" if not cls.BACKBLAZE_EMAIL or not cls.BACKBLAZE_PASSWORD:
# Check if Obsidian vault path exists raise ValueError("BACKBLAZE_EMAIL and BACKBLAZE_PASSWORD must be set")
if not Path(cls.OBSIDIAN_VAULT_PATH).exists(): Path(cls.OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
print(f"⚠️ Warning: Obsidian vault path does not exist: {cls.OBSIDIAN_VAULT_PATH}")
print(" You can set OBSIDIAN_VAULT_PATH environment variable or use --obsidian-path flag")
return True

236
downloader.py Normal file
View File

@@ -0,0 +1,236 @@
import logging
import re
import time
from pathlib import Path
from playwright.sync_api import sync_playwright, Page, Browser
from config import Config
logger = logging.getLogger(__name__)
BASE_URL = "https://secure.backblaze.com"
BILLING_URL = f"{BASE_URL}/billing.htm"
def login(page: Page) -> None:
logger.info("Logging in to Backblaze...")
page.goto(f"{BASE_URL}/user_signin.htm", wait_until="networkidle")
page.fill("#email", Config.BACKBLAZE_EMAIL)
page.fill("#password", Config.BACKBLAZE_PASSWORD)
page.click("#submitButton")
page.wait_for_load_state("networkidle")
if "user_signin" in page.url:
raise RuntimeError("Login failed - check credentials")
logger.info("Login successful")
def get_group_options(page: Page) -> list[dict]:
page.goto(BILLING_URL, wait_until="networkidle")
group_select = page.locator("select#groupSelection")
if group_select.count() == 0:
return [{"value": "", "label": "default"}]
options = group_select.locator("option").all()
groups = []
for opt in options:
val = opt.get_attribute("value") or ""
label = opt.inner_text().strip()
if val or label:
groups.append({"value": val, "label": label})
return groups if groups else [{"value": "", "label": "default"}]
def get_year_options(page: Page) -> list[str]:
year_select = page.locator("select#yearSelection")
if year_select.count() == 0:
return []
options = year_select.locator("option").all()
return [opt.get_attribute("value") or opt.inner_text().strip() for opt in options]
def get_invoice_links(page: Page) -> list[dict]:
links = []
rows = page.locator("table.billing-table tbody tr, table#billingTable tbody tr, table tbody tr").all()
for row in rows:
anchors = row.locator("a[href*='billing_invoice'], a[href*='invoice']").all()
for anchor in anchors:
href = anchor.get_attribute("href") or ""
text = anchor.inner_text().strip()
if href:
if not href.startswith("http"):
href = f"{BASE_URL}/{href.lstrip('/')}"
links.append({"url": href, "label": text})
if not links:
all_anchors = page.locator("a[href*='invoice']").all()
for anchor in all_anchors:
href = anchor.get_attribute("href") or ""
text = anchor.inner_text().strip()
if href and "invoice" in href.lower():
if not href.startswith("http"):
href = f"{BASE_URL}/{href.lstrip('/')}"
links.append({"url": href, "label": text})
return links
def fill_invoice_fields(page: Page) -> None:
fields = {
"vatId": Config.INVOICE_VAT_ID,
"documentType": Config.INVOICE_DOCUMENT_TYPE,
"company": Config.INVOICE_COMPANY,
"notes": Config.INVOICE_NOTES,
}
for field_id, value in fields.items():
if not value:
continue
for selector in [
f"#{field_id}",
f"input[name='{field_id}']",
f"textarea[name='{field_id}']",
f"select[name='{field_id}']",
f"input[id*='{field_id}' i]",
f"textarea[id*='{field_id}' i]",
f"select[id*='{field_id}' i]",
f"input[name*='{field_id}' i]",
f"textarea[name*='{field_id}' i]",
f"select[name*='{field_id}' i]",
]:
el = page.locator(selector).first
if el.count() > 0:
tag = el.evaluate("el => el.tagName.toLowerCase()")
if tag == "select":
el.select_option(label=value)
else:
el.fill(value)
logger.info("Filled field %s", field_id)
break
for label_text, value in [
("VAT", Config.INVOICE_VAT_ID),
("Tax", Config.INVOICE_VAT_ID),
("Document Type", Config.INVOICE_DOCUMENT_TYPE),
("Type", Config.INVOICE_DOCUMENT_TYPE),
("Company", Config.INVOICE_COMPANY),
("Notes", Config.INVOICE_NOTES),
("Note", Config.INVOICE_NOTES),
]:
if not value:
continue
labels = page.locator(f"label:has-text('{label_text}')").all()
for label in labels:
for_attr = label.get_attribute("for")
if for_attr:
target = page.locator(f"#{for_attr}")
if target.count() > 0:
tag = target.evaluate("el => el.tagName.toLowerCase()")
if tag == "select":
target.select_option(label=value)
else:
target.fill(value)
logger.info("Filled labeled field '%s' -> #%s", label_text, for_attr)
break
def sanitize_filename(name: str) -> str:
name = re.sub(r'[<>:"/\\|?*]', '_', name)
name = re.sub(r'\s+', '_', name)
return name.strip('_')
def export_invoice_pdf(page: Page, invoice_url: str, output_path: Path) -> Path:
page.goto(invoice_url, wait_until="networkidle")
time.sleep(1)
fill_invoice_fields(page)
time.sleep(0.5)
page.pdf(path=str(output_path), format="A4", print_background=True)
logger.info("Saved: %s", output_path)
return output_path
def download_all_invoices() -> list[Path]:
Config.validate()
output_dir = Path(Config.OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
saved = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=Config.BROWSER_HEADLESS)
context = browser.new_context()
page = context.new_page()
page.set_default_timeout(Config.BROWSER_TIMEOUT)
login(page)
page.goto(BILLING_URL, wait_until="networkidle")
groups = get_group_options(page)
logger.info("Found %d group(s): %s", len(groups), [g["label"] for g in groups])
for group in groups:
group_label = sanitize_filename(group["label"])
group_dir = output_dir / group_label if len(groups) > 1 else output_dir
if group["value"]:
page.goto(BILLING_URL, wait_until="networkidle")
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("networkidle")
time.sleep(1)
years = get_year_options(page)
if not years:
years = ["all"]
logger.info("Group '%s' - years: %s", group["label"], years)
for year in years:
year_dir = group_dir / year if year != "all" else group_dir
if year != "all":
year_select = page.locator("select#yearSelection")
if year_select.count() > 0:
year_select.select_option(value=year)
page.wait_for_load_state("networkidle")
time.sleep(1)
invoices = get_invoice_links(page)
logger.info(
"Group '%s', Year '%s' - found %d invoice(s)",
group["label"], year, len(invoices),
)
if not invoices:
continue
year_dir.mkdir(parents=True, exist_ok=True)
for idx, invoice in enumerate(invoices):
label = sanitize_filename(invoice["label"]) or f"invoice_{idx + 1}"
pdf_path = year_dir / f"{label}.pdf"
if pdf_path.exists():
logger.info("Skipping (exists): %s", pdf_path)
saved.append(pdf_path)
continue
try:
saved.append(export_invoice_pdf(page, invoice["url"], pdf_path))
except Exception:
logger.exception("Failed to export: %s", invoice["url"])
if year != "all":
page.goto(BILLING_URL, wait_until="networkidle")
if group["value"]:
group_select = page.locator("select#groupSelection")
if group_select.count() > 0:
group_select.select_option(value=group["value"])
page.wait_for_load_state("networkidle")
time.sleep(1)
browser.close()
return saved

View File

@@ -1,25 +0,0 @@
#!/bin/bash
# Content Extractor - Quick extraction script
# Usage: ./extract.sh <url> [folder]
if [ -z "$1" ]; then
echo "Usage: $0 <url> [folder]"
echo ""
echo "Examples:"
echo " $0 https://youtube.com/watch?v=abc123"
echo " $0 https://instagram.com/reel/xyz789 Learning"
echo " $0 https://medium.com/article Articles"
exit 1
fi
URL="$1"
FOLDER="${2:-Content Extractor}"
echo "🔥 Content Extractor"
echo "===================="
echo "URL: $URL"
echo "Folder: $FOLDER"
echo ""
cd "$(dirname "$0")"
python main.py "$URL" --folder "$FOLDER"

View File

@@ -1,13 +0,0 @@
"""
Content Extractors Package
"""
from .youtube_extractor import YouTubeExtractor
from .blog_extractor import BlogExtractor
from .instagram_extractor import InstagramExtractor
__all__ = [
"YouTubeExtractor",
"BlogExtractor",
"InstagramExtractor",
]

View File

@@ -1,224 +0,0 @@
"""
Blog/Article Extractor
Extracts:
- Title, author, publish date
- Main article content
- Tags/categories
- Summary
"""
import re
from typing import Dict, Any, Optional
from urllib.parse import urlparse
try:
import requests
from bs4 import BeautifulSoup
except ImportError:
requests = None
BeautifulSoup = None
class BlogExtractor:
"""Extract content from blog posts and articles."""
def __init__(self, url: str):
self.url = url
self.html = None
self.soup = None
self._fetch_page()
def _fetch_page(self):
"""Fetch the webpage."""
if requests is None:
raise ImportError("requests not installed. Run: pip install requests")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
try:
response = requests.get(self.url, headers=headers, timeout=30)
response.raise_for_status()
self.html = response.text
except Exception as e:
raise Exception(f"Failed to fetch page: {str(e)}")
def _parse_html(self):
"""Parse HTML with BeautifulSoup."""
if BeautifulSoup is None:
raise ImportError("beautifulsoup4 not installed. Run: pip install beautifulsoup4")
if self.soup is None:
self.soup = BeautifulSoup(self.html, 'lxml')
def extract(self) -> Dict[str, Any]:
"""Extract all content from the page."""
self._parse_html()
content = {
"title": self._get_title(),
"description": self._get_description(),
"author": self._get_author(),
"publish_date": self._get_publish_date(),
"content": self._get_content(),
"key_points": self._generate_key_points(),
"tags": self._get_tags(),
}
return content
def _get_title(self) -> str:
"""Get page title."""
# Try Open Graph title first
og_title = self.soup.find('meta', property='og:title')
if og_title and og_title.get('content'):
return og_title['content'].strip()
# Try Twitter card title
twitter_title = self.soup.find('meta', attrs={'name': 'twitter:title'})
if twitter_title and twitter_title.get('content'):
return twitter_title['content'].strip()
# Try h1 tag
h1 = self.soup.find('h1')
if h1:
return h1.get_text().strip()
# Fallback to <title> tag
title_tag = self.soup.find('title')
if title_tag:
return title_tag.get_text().strip()
return "Untitled Article"
def _get_description(self) -> str:
"""Get page description."""
# Try Open Graph description
og_desc = self.soup.find('meta', property='og:description')
if og_desc and og_desc.get('content'):
return og_desc['content'].strip()
# Try meta description
meta_desc = self.soup.find('meta', attrs={'name': 'description'})
if meta_desc and meta_desc.get('content'):
return meta_desc['content'].strip()
return ""
def _get_author(self) -> str:
"""Get article author."""
# Try Open Graph author
og_author = self.soup.find('meta', property='article:author')
if og_author and og_author.get('content'):
return og_author['content'].strip()
# Try meta author
meta_author = self.soup.find('meta', attrs={'name': 'author'})
if meta_author and meta_author.get('content'):
return meta_author['content'].strip()
# Try to find author in byline
byline = self.soup.find(class_=re.compile(r'byline|author', re.I))
if byline:
return byline.get_text().strip()
return "Unknown"
def _get_publish_date(self) -> str:
"""Get publish date."""
# Try Open Graph publish time
og_time = self.soup.find('meta', property='article:published_time')
if og_time and og_time.get('content'):
return og_time['content'][:10] # YYYY-MM-DD
# Try meta publish date
meta_time = self.soup.find('meta', attrs={'name': 'date'})
if meta_time and meta_time.get('content'):
return meta_time['content'][:10]
# Try time tag
time_tag = self.soup.find('time')
if time_tag and time_tag.get('datetime'):
return time_tag['datetime'][:10]
return "Unknown"
def _get_content(self) -> str:
"""Extract main article content."""
# Remove unwanted elements
for element in self.soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
element.decompose()
# Try to find main content area
content_areas = [
self.soup.find('article'),
self.soup.find(class_=re.compile(r'article|content|post|entry', re.I)),
self.soup.find(id=re.compile(r'article|content|post', re.I)),
self.soup.find('main'),
]
content_elem = next((elem for elem in content_areas if elem), None)
if content_elem:
# Get paragraphs from content area
paragraphs = content_elem.find_all('p')
else:
# Fallback to all paragraphs
paragraphs = self.soup.find_all('p')
# Extract text from paragraphs
text_parts = []
for p in paragraphs:
text = p.get_text().strip()
if len(text) > 50: # Filter out short paragraphs
text_parts.append(text)
# Join and clean
content = "\n\n".join(text_parts)
content = re.sub(r'\n{3,}', '\n\n', content) # Remove excessive newlines
return content[:10000] # Limit length
def _generate_key_points(self) -> list:
"""Generate key points from content."""
content = self._get_content()
if not content:
return []
# Extract first few sentences as key points
sentences = re.split(r'[.!?]+', content)
key_points = []
for sentence in sentences[:5]:
sentence = sentence.strip()
if len(sentence) > 30 and len(sentence) < 200:
key_points.append(sentence + '.')
return key_points
def _get_tags(self) -> list:
"""Get article tags/categories."""
tags = []
# Try Open Graph article tags
og_tags = self.soup.find_all('meta', property='article:tag')
for tag in og_tags:
if tag.get('content'):
tags.append(tag['content'].lower().replace(' ', '-'))
# Try to find tag elements
tag_elements = self.soup.find_all(class_=re.compile(r'tag|category|label', re.I))
for elem in tag_elements[:5]: # Limit to 5
text = elem.get_text().strip().lower()
if len(text) < 30:
tags.append(text.replace(' ', '-'))
# Add domain-based tag
domain = urlparse(self.url).netloc
if domain:
tags.append(domain.replace('www.', '').split('.')[0])
return list(set(tags))[:10] # Remove duplicates and limit

View File

@@ -1,390 +0,0 @@
"""
Instagram Reel Extractor
Extracts:
- Title/caption
- Author/creator
- Description
- Transcript (if available via captions)
- Metadata (views, likes, etc.)
Note: Instagram requires browser automation. Uses Playwright.
"""
import html
import json
import re
import time
from typing import Dict, Any
from urllib.parse import urlparse
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
except ImportError:
sync_playwright = None
class InstagramExtractor:
"""Extract content from Instagram reels."""
def __init__(self, url: str, headless: bool = True):
self.url = url
self.headless = headless
self.data = {}
if sync_playwright is None:
raise ImportError("playwright not installed. Run: pip install playwright && playwright install")
def extract(self) -> Dict[str, Any]:
"""Extract content from Instagram reel."""
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless)
page = browser.new_page(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
# Navigate to the reel
print(f"📱 Loading Instagram reel...")
page.goto(self.url, timeout=30000)
# Wait for content to load
time.sleep(3)
# Try to close any cookies/login prompts
try:
page.click('button:has-text("Not Now")', timeout=3000)
except:
pass
try:
page.click('button:has-text("Allow")', timeout=3000)
except:
pass
# Extract data
self.data = self._extract_data(page)
browser.close()
except PlaywrightTimeout:
print("⚠️ Timeout loading Instagram page")
self.data = self._fallback_extract()
except Exception as e:
print(f"⚠️ Error: {str(e)}")
self.data = self._fallback_extract()
return self.data
def _extract_data(self, page) -> Dict[str, Any]:
"""Extract data from loaded page."""
data = {
"title": "Instagram Reel",
"description": "",
"author": "Unknown",
"content": "",
"key_points": [],
"tags": ["instagram", "reel"],
}
def _looks_like_language_list(text: str) -> bool:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if len(lines) < 8:
return False
short_lines = [line for line in lines if len(line) <= 20]
if len(short_lines) / len(lines) < 0.8:
return False
single_tokenish = [line for line in short_lines if len(line.split()) <= 2]
return len(single_tokenish) / len(lines) > 0.7
def _looks_like_ui_prompt(text: str) -> bool:
lowered = text.lower()
blockers = [
"allow the use of cookies",
"use of cookies",
"cookies and similar technologies",
"cookies policy",
"cookie preferences",
"learn more about cookies",
"review or change your choices",
"essential cookies",
"optional cookies",
"cookies from other companies",
"meta products",
"safer experience",
"information we receive from cookies",
"accept all",
"only allow essential",
"log in",
"login",
"sign up",
"sign in",
"save your login info",
"turn on notifications",
"not now",
]
return any(blocker in lowered for blocker in blockers)
# Try to get caption/description from meta and embedded JSON first
try:
meta_desc = page.query_selector('meta[property="og:description"], meta[name="description"]')
if meta_desc:
text = (meta_desc.get_attribute("content") or "").strip()
if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text):
data["description"] = text
meta_title = page.query_selector('meta[property="og:title"], meta[name="twitter:title"]')
if meta_title and data["title"] == "Instagram Reel":
title_text = (meta_title.get_attribute("content") or "").strip()
if title_text:
data["title"] = title_text
if not data["description"]:
html_source = page.content()
patterns = [
r'<meta[^>]+property="og:description"[^>]+content="([^"]+)"',
r'<meta[^>]+name="description"[^>]+content="([^"]+)"',
r'<meta[^>]+name="twitter:description"[^>]+content="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, html_source, re.IGNORECASE)
if match:
text = html.unescape(match.group(1)).strip()
if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text):
data["description"] = text
break
scripts = page.query_selector_all('script[type="application/ld+json"]')
for script in scripts:
raw = script.inner_text().strip()
if not raw:
continue
try:
payload = json.loads(raw)
except Exception:
continue
def extract_from_obj(obj: Dict[str, Any]):
if not isinstance(obj, dict):
return
desc = obj.get("description")
if desc and not data["description"]:
if not _looks_like_ui_prompt(desc) and not _looks_like_language_list(desc):
data["description"] = desc.strip()
author = obj.get("author")
if author and data["author"] == "Unknown":
if isinstance(author, dict):
name = author.get("name")
if name:
data["author"] = name.strip()
elif isinstance(author, list):
for item in author:
if isinstance(item, dict) and item.get("name"):
data["author"] = item["name"].strip()
break
elif isinstance(author, str):
data["author"] = author.strip()
if isinstance(payload, list):
for obj in payload:
extract_from_obj(obj)
else:
extract_from_obj(payload)
if data["description"] and data["author"] != "Unknown":
break
except Exception as e:
print(f"⚠️ Could not extract meta/ld+json: {e}")
# Try to get caption/description from embedded shared data
try:
html = page.content()
payloads = []
shared_match = re.search(r'window\._sharedData\s*=\s*({.*?});</script>', html, re.DOTALL)
if shared_match:
payloads.append(shared_match.group(1))
for match in re.finditer(r'__additionalDataLoaded\([^,]+,\s*({.*?})\);', html, re.DOTALL):
payloads.append(match.group(1))
def extract_from_media(media: Dict[str, Any]):
if not isinstance(media, dict):
return
if data["author"] == "Unknown":
owner = media.get("owner") or {}
if isinstance(owner, dict):
name = owner.get("username") or owner.get("full_name")
if name:
data["author"] = name.strip()
caption_text = None
edge = media.get("edge_media_to_caption")
if isinstance(edge, dict):
edges = edge.get("edges") or []
if edges:
node = edges[0].get("node", {})
if isinstance(node, dict):
caption_text = node.get("text")
if not caption_text and isinstance(media.get("caption"), dict):
caption_text = media["caption"].get("text")
if caption_text and not data["description"]:
if not _looks_like_ui_prompt(caption_text) and not _looks_like_language_list(caption_text):
data["description"] = caption_text.strip()
def walk(obj: Any):
if isinstance(obj, dict):
graphql = obj.get("graphql")
if isinstance(graphql, dict):
extract_from_media(graphql.get("shortcode_media") or graphql.get("media"))
if isinstance(obj.get("shortcode_media"), dict):
extract_from_media(obj.get("shortcode_media"))
for v in obj.values():
walk(v)
elif isinstance(obj, list):
for item in obj:
walk(item)
for raw in payloads:
try:
parsed = json.loads(raw)
except Exception:
continue
walk(parsed)
if data["description"] and data["author"] != "Unknown":
break
except Exception as e:
print(f"⚠️ Could not extract shared data: {e}")
# Try to get caption/description from visible text
try:
# Look for caption text
captions = page.query_selector_all('h1, h2, span')
for caption in captions:
text = caption.inner_text().strip()
if (
len(text) > 20
and len(text) < 500
and not _looks_like_language_list(text)
and not _looks_like_ui_prompt(text)
):
if not data["description"]:
data["description"] = text
break
except Exception as e:
print(f"⚠️ Could not extract caption: {e}")
# Try to get author
try:
author_elem = page.query_selector('a[href*="/"] h1, a[href*="/"] h2, header span')
if author_elem:
data["author"] = author_elem.inner_text().strip()
except:
pass
# Try to get engagement metrics
try:
likes_elem = page.query_selector('span:has-text("likes"), span:has-text("views")')
if likes_elem:
data["views"] = likes_elem.inner_text().strip()
except:
pass
# Extract any visible text as content
try:
if data["description"] and not _looks_like_ui_prompt(data["description"]):
data["content"] = data["description"].strip()
else:
# Get all text content
body_text = page.inner_text('body')
# Filter for meaningful content
lines = body_text.split('\n')
cleaned_lines = []
buffer = []
def flush_buffer():
if buffer:
block = "\n".join(buffer)
if not _looks_like_language_list(block):
cleaned_lines.extend(
[line for line in buffer if not _looks_like_ui_prompt(line)]
)
buffer.clear()
for line in lines:
stripped = line.strip()
if not stripped:
flush_buffer()
continue
if _looks_like_ui_prompt(stripped):
continue
if len(stripped) <= 24:
buffer.append(stripped)
else:
flush_buffer()
cleaned_lines.append(stripped)
flush_buffer()
meaningful_lines = [
line for line in cleaned_lines
if len(line) > 30 and len(line) < 300
]
data["content"] = "\n\n".join(meaningful_lines[:10])[:5000]
except Exception as e:
print(f"⚠️ Could not extract page text: {e}")
# Generate key points from description or content
base_text = ""
if data["description"] and not _looks_like_ui_prompt(data["description"]):
base_text = data["description"]
elif data["content"]:
base_text = data["content"]
if base_text:
sentences = re.split(r'(?<=[.!?])\s+', base_text.strip())
data["key_points"] = [
s.strip() for s in sentences
if 20 < len(s.strip()) < 200
][:3]
# Add URL-based tags
parsed = urlparse(self.url)
if '/reel/' in parsed.path:
data["tags"].append("reel")
if '/video/' in parsed.path:
data["tags"].append("video")
return data
def _fallback_extract(self) -> Dict[str, Any]:
"""Fallback extraction when browser automation fails."""
print("⚠️ Using fallback extraction method...")
# Try to extract what we can from the URL itself
data = {
"title": "Instagram Content",
"description": "[Could not extract - Instagram requires login]",
"author": "Unknown",
"content": "",
"key_points": [
"Instagram content extraction requires browser automation",
"Consider using Instagram's official API or downloading the video manually",
],
"tags": ["instagram", "fallback"],
}
# Extract reel ID from URL
try:
parsed = urlparse(self.url)
path_parts = parsed.path.split('/')
for i, part in enumerate(path_parts):
if part in ['reel', 'p', 'tv'] and i + 1 < len(path_parts):
reel_id = path_parts[i + 1]
data["key_points"].append(f"Reel ID: {reel_id}")
break
except:
pass
return data

View File

@@ -1,203 +0,0 @@
"""
YouTube Video Extractor
Extracts:
- Title, description, author
- Transcript/captions
- Duration, views, publish date
- Tags/categories
"""
import re
from typing import Optional, Dict, Any
from urllib.parse import urlparse, parse_qs
try:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
except ImportError:
YouTubeTranscriptApi = None
try:
from pytubefix import YouTube # More reliable than pytube
except ImportError:
try:
from pytube import YouTube
except ImportError:
YouTube = None
class YouTubeExtractor:
"""Extract content from YouTube videos."""
def __init__(self, url: str):
self.url = url
self.video_id = self._extract_video_id(url)
self.youtube = None
def _extract_video_id(self, url: str) -> str:
"""Extract video ID from YouTube URL."""
patterns = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})',
r'youtube\.com\/embed\/([a-zA-Z0-9_-]{11})',
r'youtube\.com\/v\/([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError(f"Could not extract YouTube video ID from: {url}")
def _init_youtube(self):
"""Initialize YouTube object."""
if YouTube is None:
raise ImportError("pytube or pytubefix not installed. Run: pip install pytubefix")
if self.youtube is None:
self.youtube = YouTube(self.url)
def extract(self) -> Dict[str, Any]:
"""Extract all content from YouTube video."""
self._init_youtube()
content = {
"title": self._get_title(),
"description": self._get_description(),
"author": self._get_author(),
"duration": self._get_duration(),
"publish_date": self._get_publish_date(),
"views": self._get_views(),
"content": self._get_transcript(),
"key_points": self._generate_key_points(),
"tags": self._get_tags(),
}
return content
def _get_title(self) -> str:
"""Get video title."""
try:
self._init_youtube()
return self.youtube.title
except Exception as e:
return f"Video {self.video_id}"
def _get_description(self) -> str:
"""Get video description."""
try:
self._init_youtube()
return self.youtube.description or ""
except Exception:
return ""
def _get_author(self) -> str:
"""Get video author/channel name."""
try:
self._init_youtube()
return self.youtube.author
except Exception:
return "Unknown"
def _get_duration(self) -> str:
"""Get video duration in readable format."""
try:
self._init_youtube()
seconds = self.youtube.length
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes}:{secs:02d}"
except Exception:
return "Unknown"
def _get_publish_date(self) -> str:
"""Get video publish date."""
try:
self._init_youtube()
if hasattr(self.youtube, 'publish_date') and self.youtube.publish_date:
return self.youtube.publish_date.strftime("%Y-%m-%d")
except Exception:
pass
return "Unknown"
def _get_views(self) -> str:
"""Get view count."""
try:
self._init_youtube()
views = self.youtube.views
if views > 1_000_000:
return f"{views / 1_000_000:.1f}M"
elif views > 1_000:
return f"{views / 1_000:.1f}K"
else:
return str(views)
except Exception:
return "Unknown"
def _get_transcript(self) -> str:
"""Get video transcript/captions."""
if YouTubeTranscriptApi is None:
return "[Transcript not available - youtube-transcript-api not installed]"
try:
# New API requires creating an instance
api = YouTubeTranscriptApi()
transcript_list = api.list(self.video_id)
# Try to find English transcript
transcript = None
for t in transcript_list:
if t.language_code == 'en':
transcript = t
break
# Fallback to first available
if transcript is None:
transcript = next(iter(transcript_list), None)
if transcript is None:
return "[No transcript available]"
transcript_data = transcript.fetch()
# New API returns FetchedTranscript with snippets
if hasattr(transcript_data, 'snippets'):
full_text = " ".join([snippet.text for snippet in transcript_data.snippets])
else:
# Fallback for older API format
full_text = " ".join([entry['text'] for entry in transcript_data])
# Clean up the text
full_text = full_text.replace("\n", " ").strip()
return full_text[:10000] # Limit length
except Exception as e:
return f"[Transcript not available: {str(e)}]"
def _generate_key_points(self) -> list:
"""Generate key points from transcript (simple extraction)."""
transcript = self._get_transcript()
if not transcript or transcript.startswith("["):
return []
# Simple sentence extraction (first few sentences as key points)
sentences = transcript.split('.')[:5]
key_points = [s.strip() + '.' for s in sentences if len(s.strip()) > 20]
return key_points[:5]
def _get_tags(self) -> list:
"""Get video tags."""
try:
self._init_youtube()
if hasattr(self.youtube, 'keywords'):
return self.youtube.keywords[:10] if self.youtube.keywords else []
except Exception:
pass
return ["youtube", "video"]

View File

@@ -1,5 +1,5 @@
{ {
description = "Development environment for jbackup"; description = "Backblaze Invoice Downloader";
inputs = { inputs = {
nixpkgs.url = "nixpkgs/nixos-unstable"; nixpkgs.url = "nixpkgs/nixos-unstable";
@@ -18,16 +18,8 @@
devShell = pkgs.mkShell { devShell = pkgs.mkShell {
packages = with pkgs; [ packages = with pkgs; [
(python3.withPackages (ps: [ (python3.withPackages (ps: [
ps.requests
ps.beautifulsoup4
ps.lxml
ps."youtube-transcript-api"
ps.pytube
ps.playwright ps.playwright
ps.markdown
ps."python-dotenv" ps."python-dotenv"
ps.pydantic
ps."python-dateutil"
])) ]))
playwright-driver.browsers playwright-driver.browsers
]; ];

250
main.py
View File

@@ -1,231 +1,53 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
Content Extractor - Extract key information from URLs and save to Obsidian
Supports:
- YouTube videos (transcripts, descriptions, metadata)
- Blog posts & articles (web scraping)
- Instagram reels (via browser automation)
- Generic URLs (text extraction)
Usage:
python main.py <url> [--obsidian-path <path>] [--output <filename>]
"""
import argparse import argparse
import sys
import logging import logging
from pathlib import Path import sys
from datetime import datetime
from typing import Optional
from extractors.youtube_extractor import YouTubeExtractor
from extractors.blog_extractor import BlogExtractor
from extractors.instagram_extractor import InstagramExtractor
from obsidian_writer import ObsidianWriter
from config import Config from config import Config
from summarizer import summarize_text, SummarizationError, format_markdown_content from downloader import download_all_invoices
def detect_source_type(url: str) -> str:
"""Detect the type of content based on URL."""
if "youtube.com" in url or "youtu.be" in url:
return "youtube"
elif "instagram.com" in url and "/reel" in url:
return "instagram"
elif "instagram.com" in url:
return "instagram"
else:
return "blog"
def extract_content(url: str, source_type: str) -> dict:
"""Extract content from URL based on source type."""
print(f"🔍 Extracting content from {source_type}...")
if source_type == "youtube":
extractor = YouTubeExtractor(url)
elif source_type == "instagram":
extractor = InstagramExtractor(url)
else:
extractor = BlogExtractor(url)
return extractor.extract()
def main(): def main():
logging.basicConfig( parser = argparse.ArgumentParser(description="Download Backblaze invoices as PDF")
level=getattr(logging, Config.LOG_LEVEL.upper(), logging.INFO), parser.add_argument("--output", "-o", help="Output directory", default=None)
format="%(asctime)s %(levelname)s [%(name)s] %(message)s", parser.add_argument("--headless", action="store_true", default=None, help="Run browser headless")
handlers=[ parser.add_argument("--no-headless", action="store_true", default=False, help="Show browser window")
logging.StreamHandler(), parser.add_argument("--vat-id", help="VAT ID to fill on invoices")
logging.FileHandler(Config.LOG_FILE), parser.add_argument("--document-type", help="Document type to select")
], parser.add_argument("--company", help="Company name to fill")
) parser.add_argument("--notes", help="Notes to fill on invoices")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging")
parser = argparse.ArgumentParser(
description="Extract content from URLs and save to Obsidian notes"
)
parser.add_argument("url", help="URL to extract content from")
parser.add_argument(
"--obsidian-path",
type=str,
default=Config.OBSIDIAN_VAULT_PATH,
help="Path to Obsidian vault"
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Output filename (without .md extension)"
)
parser.add_argument(
"--folder",
type=str,
default="Content Extractor",
help="Folder in Obsidian vault to save notes"
)
parser.add_argument(
"--no-save",
action="store_true",
help="Only print extracted content, don't save to Obsidian"
)
parser.add_argument(
"--summarize",
action="store_true",
help="Generate a summary of the content"
)
args = parser.parse_args() args = parser.parse_args()
# Detect source type logging.basicConfig(
source_type = detect_source_type(args.url) level=logging.DEBUG if args.verbose else getattr(logging, Config.LOG_LEVEL),
print(f"📌 Detected source type: {source_type}") format="%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
# Extract content
try:
content = extract_content(args.url, source_type)
except Exception as e:
print(f"❌ Extraction failed: {e}")
sys.exit(1)
if not content:
print("❌ No content could be extracted")
sys.exit(1)
if content.get("content"):
try:
content["content"] = format_markdown_content(content["content"])
except SummarizationError as e:
print(f"⚠️ Content formatting failed: {e}")
# Generate AI summary + key points
if args.summarize or Config.GENERATE_SUMMARY:
source_text = "\n\n".join(
part for part in [content.get("description", ""), content.get("content", "")]
if part
).strip()
if source_text:
try:
summary_result = summarize_text(source_text, max_points=3)
if summary_result.get("summary"):
content["description"] = summary_result["summary"]
if summary_result.get("key_points"):
content["key_points"] = summary_result["key_points"]
except SummarizationError as e:
print(f"⚠️ Summarization failed: {e}")
# Generate output filename
if args.output: if args.output:
filename = args.output Config.OUTPUT_DIR = args.output
else: if args.no_headless:
# Generate from title or URL Config.BROWSER_HEADLESS = False
title = content.get("title", "Untitled") elif args.headless is True:
filename = f"{title[:50]}_{datetime.now().strftime('%Y%m%d')}" Config.BROWSER_HEADLESS = True
# Sanitize filename if args.vat_id:
filename = "".join(c for c in filename if c.isalnum() or c in " -_").strip() Config.INVOICE_VAT_ID = args.vat_id
if args.document_type:
Config.INVOICE_DOCUMENT_TYPE = args.document_type
if args.company:
Config.INVOICE_COMPANY = args.company
if args.notes:
Config.INVOICE_NOTES = args.notes
# Create markdown content try:
markdown = generate_markdown(content, source_type, args.url) Config.validate()
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Print preview saved = download_all_invoices()
print("\n" + "="*80) print(f"\nDone. {len(saved)} invoice(s) saved to {Config.OUTPUT_DIR}")
print("📝 EXTRACTED CONTENT PREVIEW")
print("="*80)
print(markdown[:2000] + "..." if len(markdown) > 2000 else markdown)
print("="*80)
# Save to Obsidian
if not args.no_save:
writer = ObsidianWriter(args.obsidian_path)
output_path = writer.save_note(markdown, filename, args.folder)
print(f"\n✅ Note saved to: {output_path}")
else:
print("\n⚠️ Note not saved (--no-save flag)")
return content
def generate_markdown(content: dict, source_type: str, url: str) -> str:
"""Generate markdown content for Obsidian note."""
lines = []
# Header
lines.append(f"# {content.get('title', 'Untitled')}")
lines.append("")
# Metadata
lines.append("## Metadata")
lines.append("")
lines.append(f"- **Source**: {source_type.capitalize()}")
lines.append(f"- **URL**: {url}")
lines.append(f"- **Extracted**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if content.get("author"):
lines.append(f"- **Author**: {content.get('author')}")
if content.get("duration"):
lines.append(f"- **Duration**: {content.get('duration')}")
if content.get("publish_date"):
lines.append(f"- **Published**: {content.get('publish_date')}")
if content.get("views"):
lines.append(f"- **Views**: {content.get('views')}")
lines.append("")
# Description/Summary
if content.get("description"):
lines.append("## Description")
lines.append("")
lines.append(content.get("description", ""))
lines.append("")
# Main Content (transcript, article text, etc.)
if content.get("content"):
lines.append("## Content")
lines.append("")
lines.append(content.get("content", ""))
lines.append("")
# Key Points/Summary
if content.get("key_points"):
lines.append("## Key Points")
lines.append("")
for point in content.get("key_points", []):
lines.append(f"- {point}")
lines.append("")
# Tags
lines.append("---")
lines.append("")
lines.append("## Tags")
lines.append("")
tags = content.get("tags", [])
if not tags:
tags = ["content-extractor", source_type, "notes"]
lines.append(" ".join(f"#{tag}" for tag in tags))
lines.append("")
return "\n".join(lines)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,128 +0,0 @@
"""
Obsidian Note Writer
Saves extracted content as markdown notes in Obsidian vault.
"""
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
class ObsidianWriter:
"""Write content to Obsidian vault as markdown notes."""
def __init__(self, vault_path: str):
self.vault_path = Path(vault_path).expanduser()
self._validate_vault()
def _validate_vault(self):
"""Validate that the path is an Obsidian vault."""
if not self.vault_path.exists():
print(f"⚠️ Creating Obsidian vault directory: {self.vault_path}")
self.vault_path.mkdir(parents=True, exist_ok=True)
# Check if it looks like an Obsidian vault
obsidian_config = self.vault_path / ".obsidian"
if not obsidian_config.exists():
print(f"⚠️ Warning: {self.vault_path} doesn't look like an Obsidian vault")
print(" (No .obsidian directory found)")
print(" Notes will still be saved, but you may want to set the correct vault path")
def save_note(
self,
content: str,
filename: str,
folder: Optional[str] = None,
subfolder: Optional[str] = None
) -> Path:
"""
Save a note to Obsidian vault.
Args:
content: Markdown content to save
filename: Filename without .md extension
folder: Folder in vault (default: root)
subfolder: Subfolder within folder (optional)
Returns:
Path to saved file
"""
# Build path
if folder:
note_dir = self.vault_path / folder
if subfolder:
note_dir = note_dir / subfolder
else:
note_dir = self.vault_path
# Create directory if it doesn't exist
note_dir.mkdir(parents=True, exist_ok=True)
# Sanitize filename
filename = self._sanitize_filename(filename)
# Add .md extension
filepath = note_dir / f"{filename}.md"
# Handle duplicate filenames
counter = 1
original_filepath = filepath
while filepath.exists():
filepath = original_filepath.with_name(f"{filename}_{counter}.md")
counter += 1
# Write the file
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
print(f"✅ Note saved: {filepath.name}")
return filepath
except Exception as e:
raise Exception(f"Failed to save note: {str(e)}")
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize filename for filesystem."""
# Remove invalid characters
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '')
# Replace spaces with hyphens (optional, but cleaner)
# filename = filename.replace(' ', '-')
# Limit length
if len(filename) > 100:
filename = filename[:100]
return filename.strip()
def create_daily_note(self, content: str) -> Path:
"""Create/update a daily note."""
today = datetime.now().strftime("%Y-%m-%d")
folder = "Daily Notes"
return self.save_note(content, today, folder)
def append_to_note(self, filename: str, content: str, folder: Optional[str] = None) -> Path:
"""Append content to an existing note."""
if folder:
note_dir = self.vault_path / folder
else:
note_dir = self.vault_path
filepath = note_dir / f"{filename}.md"
# If file doesn't exist, create it
if not filepath.exists():
return self.save_note(content, filename, folder)
# Append to existing file
try:
with open(filepath, 'a', encoding='utf-8') as f:
f.write("\n\n---\n\n")
f.write(content)
print(f"✅ Content appended to: {filepath.name}")
return filepath
except Exception as e:
raise Exception(f"Failed to append to note: {str(e)}")

View File

@@ -1,23 +1,2 @@
# Content Extractor Dependencies
# Web scraping
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=4.9.0
# YouTube
youtube-transcript-api>=0.6.0
pytube>=15.0.0
# Browser automation (for Instagram and dynamic content)
playwright>=1.40.0 playwright>=1.40.0
# Text processing
markdown>=3.5.0
# Utilities
python-dotenv>=1.0.0 python-dotenv>=1.0.0
pydantic>=2.5.0
# Date handling
python-dateutil>=2.8.0

View File

@@ -1,172 +0,0 @@
"""
OpenAI/OpenRouter summarizer utility.
Uses OPENAI_API_KEY and OPENAI_URL from environment (via Config).
"""
from __future__ import annotations
import json
import logging
from typing import Dict, List
import requests
from config import Config
class SummarizationError(RuntimeError):
"""Raised when summarization fails."""
logger = logging.getLogger(__name__)
def summarize_text(text: str, max_points: int = 3) -> Dict[str, List[str] | str]:
"""
Summarize text into a short summary and key points.
Returns:
{
"summary": "string",
"key_points": ["point 1", "point 2", ...]
}
"""
if not text or not text.strip():
return {"summary": "", "key_points": []}
if not Config.OPENAI_API_KEY:
raise SummarizationError("OPENAI_API_KEY is not set")
payload = {
"model": Config.OPENAI_MODEL,
"messages": [
{
"role": "system",
"content": (
"You are a precise summarizer. Return JSON only with keys "
"`summary` and `key_points` (array of strings). Do not add extra keys."
),
},
{
"role": "user",
"content": (
"Summarize the following content in 2-4 sentences and provide "
f"{max_points} key points.\n\n"
f"CONTENT:\n{text}"
),
},
],
"temperature": 0.2,
"max_tokens": 400,
}
headers = {
"Authorization": f"Bearer {Config.OPENAI_API_KEY}",
"Content-Type": "application/json",
}
try:
logger.info(
"OpenAI request: url=%s model=%s timeout=%ss input_chars=%s",
Config.OPENAI_URL,
Config.OPENAI_MODEL,
Config.OPENAI_TIMEOUT,
len(text),
)
if Config.OPENAI_LOG_PAYLOAD:
logger.debug("OpenAI request payload: %s", json.dumps(payload, ensure_ascii=False))
response = requests.post(
Config.OPENAI_URL,
headers=headers,
json=payload,
timeout=Config.OPENAI_TIMEOUT,
)
logger.info("OpenAI response: status=%s", response.status_code)
if Config.OPENAI_LOG_PAYLOAD:
logger.debug("OpenAI response body: %s", response.text)
response.raise_for_status()
data = response.json()
except Exception as exc:
logger.exception("OpenAI request failed")
raise SummarizationError(f"Request failed: {exc}") from exc
try:
content = data["choices"][0]["message"]["content"].strip()
result = json.loads(content)
summary = result.get("summary", "").strip()
key_points = [p.strip() for p in result.get("key_points", []) if p.strip()]
return {"summary": summary, "key_points": key_points}
except Exception as exc:
raise SummarizationError(f"Invalid response format: {exc}") from exc
def format_markdown_content(text: str) -> str:
"""
Clean and format social content into sensible markdown.
- Remove excessive emojis/icons
- Convert list-like lines into ordered/bulleted lists
- Remove obvious ads/sponsor lines
- Normalize whitespace
"""
if not text or not text.strip():
return ""
if not Config.OPENAI_API_KEY:
raise SummarizationError("OPENAI_API_KEY is not set")
payload = {
"model": Config.OPENAI_MODEL,
"messages": [
{
"role": "system",
"content": (
"You are a precise formatter. Return only cleaned markdown text. "
"Remove ads/sponsor lines, collapse excessive whitespace, "
"and replace emoji-heavy bullets with normal bullet/numbered lists. "
"Do not add a title or extra sections."
),
},
{
"role": "user",
"content": (
"Format the following content:\n\n"
f"{text}"
),
},
],
"temperature": 0.1,
"max_tokens": 800,
}
headers = {
"Authorization": f"Bearer {Config.OPENAI_API_KEY}",
"Content-Type": "application/json",
}
try:
logger.info(
"OpenAI format request: url=%s model=%s timeout=%ss input_chars=%s",
Config.OPENAI_URL,
Config.OPENAI_MODEL,
Config.OPENAI_TIMEOUT,
len(text),
)
if Config.OPENAI_LOG_PAYLOAD:
logger.debug("OpenAI format request payload: %s", json.dumps(payload, ensure_ascii=False))
response = requests.post(
Config.OPENAI_URL,
headers=headers,
json=payload,
timeout=Config.OPENAI_TIMEOUT,
)
logger.info("OpenAI format response: status=%s", response.status_code)
if Config.OPENAI_LOG_PAYLOAD:
logger.debug("OpenAI format response body: %s", response.text)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"].strip()
except Exception as exc:
logger.exception("OpenAI format request failed")
raise SummarizationError(f"Formatting request failed: {exc}") from exc