233 lines
6.9 KiB
Python
233 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Content Extractor - Extract key information from URLs and save to Obsidian
|
|
|
|
Supports:
|
|
- YouTube videos (transcripts, descriptions, metadata)
|
|
- Blog posts & articles (web scraping)
|
|
- Instagram reels (via browser automation)
|
|
- Generic URLs (text extraction)
|
|
|
|
Usage:
|
|
python main.py <url> [--obsidian-path <path>] [--output <filename>]
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from extractors.youtube_extractor import YouTubeExtractor
|
|
from extractors.blog_extractor import BlogExtractor
|
|
from extractors.instagram_extractor import InstagramExtractor
|
|
from obsidian_writer import ObsidianWriter
|
|
from config import Config
|
|
from summarizer import summarize_text, SummarizationError, format_markdown_content
|
|
|
|
|
|
def detect_source_type(url: str) -> str:
|
|
"""Detect the type of content based on URL."""
|
|
if "youtube.com" in url or "youtu.be" in url:
|
|
return "youtube"
|
|
elif "instagram.com" in url and "/reel" in url:
|
|
return "instagram"
|
|
elif "instagram.com" in url:
|
|
return "instagram"
|
|
else:
|
|
return "blog"
|
|
|
|
|
|
def extract_content(url: str, source_type: str) -> dict:
|
|
"""Extract content from URL based on source type."""
|
|
print(f"🔍 Extracting content from {source_type}...")
|
|
|
|
if source_type == "youtube":
|
|
extractor = YouTubeExtractor(url)
|
|
elif source_type == "instagram":
|
|
extractor = InstagramExtractor(url)
|
|
else:
|
|
extractor = BlogExtractor(url)
|
|
|
|
return extractor.extract()
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
level=getattr(logging, Config.LOG_LEVEL.upper(), logging.INFO),
|
|
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler(Config.LOG_FILE),
|
|
],
|
|
)
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Extract content from URLs and save to Obsidian notes"
|
|
)
|
|
parser.add_argument("url", help="URL to extract content from")
|
|
parser.add_argument(
|
|
"--obsidian-path",
|
|
type=str,
|
|
default=Config.OBSIDIAN_VAULT_PATH,
|
|
help="Path to Obsidian vault"
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
default=None,
|
|
help="Output filename (without .md extension)"
|
|
)
|
|
parser.add_argument(
|
|
"--folder",
|
|
type=str,
|
|
default="Content Extractor",
|
|
help="Folder in Obsidian vault to save notes"
|
|
)
|
|
parser.add_argument(
|
|
"--no-save",
|
|
action="store_true",
|
|
help="Only print extracted content, don't save to Obsidian"
|
|
)
|
|
parser.add_argument(
|
|
"--summarize",
|
|
action="store_true",
|
|
help="Generate a summary of the content"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Detect source type
|
|
source_type = detect_source_type(args.url)
|
|
print(f"📌 Detected source type: {source_type}")
|
|
|
|
# Extract content
|
|
try:
|
|
content = extract_content(args.url, source_type)
|
|
except Exception as e:
|
|
print(f"❌ Extraction failed: {e}")
|
|
sys.exit(1)
|
|
|
|
if not content:
|
|
print("❌ No content could be extracted")
|
|
sys.exit(1)
|
|
|
|
if content.get("content"):
|
|
try:
|
|
content["content"] = format_markdown_content(content["content"])
|
|
except SummarizationError as e:
|
|
print(f"⚠️ Content formatting failed: {e}")
|
|
|
|
# Generate AI summary + key points
|
|
if args.summarize or Config.GENERATE_SUMMARY:
|
|
source_text = "\n\n".join(
|
|
part for part in [content.get("description", ""), content.get("content", "")]
|
|
if part
|
|
).strip()
|
|
if source_text:
|
|
try:
|
|
summary_result = summarize_text(source_text, max_points=3)
|
|
if summary_result.get("summary"):
|
|
content["description"] = summary_result["summary"]
|
|
if summary_result.get("key_points"):
|
|
content["key_points"] = summary_result["key_points"]
|
|
except SummarizationError as e:
|
|
print(f"⚠️ Summarization failed: {e}")
|
|
|
|
# Generate output filename
|
|
if args.output:
|
|
filename = args.output
|
|
else:
|
|
# Generate from title or URL
|
|
title = content.get("title", "Untitled")
|
|
filename = f"{title[:50]}_{datetime.now().strftime('%Y%m%d')}"
|
|
# Sanitize filename
|
|
filename = "".join(c for c in filename if c.isalnum() or c in " -_").strip()
|
|
|
|
# Create markdown content
|
|
markdown = generate_markdown(content, source_type, args.url)
|
|
|
|
# Print preview
|
|
print("\n" + "="*80)
|
|
print("📝 EXTRACTED CONTENT PREVIEW")
|
|
print("="*80)
|
|
print(markdown[:2000] + "..." if len(markdown) > 2000 else markdown)
|
|
print("="*80)
|
|
|
|
# Save to Obsidian
|
|
if not args.no_save:
|
|
writer = ObsidianWriter(args.obsidian_path)
|
|
output_path = writer.save_note(markdown, filename, args.folder)
|
|
print(f"\n✅ Note saved to: {output_path}")
|
|
else:
|
|
print("\n⚠️ Note not saved (--no-save flag)")
|
|
|
|
return content
|
|
|
|
|
|
def generate_markdown(content: dict, source_type: str, url: str) -> str:
|
|
"""Generate markdown content for Obsidian note."""
|
|
lines = []
|
|
|
|
# Header
|
|
lines.append(f"# {content.get('title', 'Untitled')}")
|
|
lines.append("")
|
|
|
|
# Metadata
|
|
lines.append("## Metadata")
|
|
lines.append("")
|
|
lines.append(f"- **Source**: {source_type.capitalize()}")
|
|
lines.append(f"- **URL**: {url}")
|
|
lines.append(f"- **Extracted**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
if content.get("author"):
|
|
lines.append(f"- **Author**: {content.get('author')}")
|
|
if content.get("duration"):
|
|
lines.append(f"- **Duration**: {content.get('duration')}")
|
|
if content.get("publish_date"):
|
|
lines.append(f"- **Published**: {content.get('publish_date')}")
|
|
if content.get("views"):
|
|
lines.append(f"- **Views**: {content.get('views')}")
|
|
|
|
lines.append("")
|
|
|
|
# Description/Summary
|
|
if content.get("description"):
|
|
lines.append("## Description")
|
|
lines.append("")
|
|
lines.append(content.get("description", ""))
|
|
lines.append("")
|
|
|
|
# Main Content (transcript, article text, etc.)
|
|
if content.get("content"):
|
|
lines.append("## Content")
|
|
lines.append("")
|
|
lines.append(content.get("content", ""))
|
|
lines.append("")
|
|
|
|
# Key Points/Summary
|
|
if content.get("key_points"):
|
|
lines.append("## Key Points")
|
|
lines.append("")
|
|
for point in content.get("key_points", []):
|
|
lines.append(f"- {point}")
|
|
lines.append("")
|
|
|
|
# Tags
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("## Tags")
|
|
lines.append("")
|
|
tags = content.get("tags", [])
|
|
if not tags:
|
|
tags = ["content-extractor", source_type, "notes"]
|
|
lines.append(" ".join(f"#{tag}" for tag in tags))
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|