commit c997e764b5e5fc9ce6f12ecd38024ff151bd0a4d Author: naki Date: Thu Mar 5 13:02:58 2026 +0530 feat: Initial commit - Content Extractor for YouTube, Instagram, and blogs - YouTube extraction with transcript support - Instagram reel extraction via browser automation - Blog/article web scraping - Auto-save to Obsidian vaults - Smart key point generation - Configurable via .env file - Quick extract shell script Tech stack: Python, requests, beautifulsoup4, playwright, youtube-transcript-api diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c5e86c7 --- /dev/null +++ b/.env.example @@ -0,0 +1,21 @@ +# Content Extractor Configuration + +# Obsidian vault path (default: ~/Obsidian Vault) +OBSIDIAN_VAULT_PATH=~/Obsidian Vault + +# Browser settings (for Instagram extraction) +BROWSER_HEADLESS=true +BROWSER_TIMEOUT=30000 + +# Content extraction settings +MAX_CONTENT_LENGTH=10000 +GENERATE_SUMMARY=true + +# YouTube settings +YOUTUBE_LANGUAGE=en + +# Instagram settings +INSTAGRAM_WAIT_TIME=5 + +# Logging +LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a102fa4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,52 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environment +.env +.env.local + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Logs +*.log +logs/ + +# OS +.DS_Store +Thumbs.db + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ + +# Playwright +.playwright/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..ddcb9e2 --- /dev/null +++ b/README.md @@ -0,0 +1,192 @@ +# Content Extractor šŸ”„ + +Extract key information from URLs (YouTube, Instagram, blogs) and save to Obsidian notes automatically. + +## Features + +- **YouTube Videos**: Extract title, description, transcript, author, duration, views +- **Instagram Reels**: Extract caption, author, engagement metrics (via browser automation) +- **Blog Posts/Articles**: Extract title, author, content, tags, publish date +- **Auto-save to Obsidian**: Notes are automatically formatted and saved to your Obsidian vault +- **Smart Summaries**: Generates key points from extracted content + +## Installation + +```bash +# Navigate to the content-extractor directory +cd ~/Desktop/itsthatnewshit/content-extractor + +# Install dependencies +pip install -r requirements.txt + +# Install Playwright browsers (for Instagram extraction) +playwright install +``` + +## Usage + +### Basic Usage + +```bash +# Extract from YouTube video +python main.py "https://www.youtube.com/watch?v=VIDEO_ID" + +# Extract from Instagram reel +python main.py "https://www.instagram.com/reel/REEL_ID" + +# Extract from blog post +python main.py "https://example.com/article" +``` + +### Advanced Options + +```bash +# Specify Obsidian vault path +python main.py --obsidian-path "/path/to/Obsidian Vault" + +# Custom output filename +python main.py --output "my-note-title" + +# Save to specific folder in Obsidian +python main.py --folder "Learning/YouTube" + +# Only print content, don't save to Obsidian +python main.py --no-save + +# Generate summary +python main.py --summarize +``` + +### Examples + +```bash +# Save YouTube tutorial to Learning folder +python main.py "https://youtu.be/abc123" --folder "Learning" --output "Python Tutorial" + +# Extract Instagram reel without saving +python main.py "https://instagram.com/reel/xyz789" --no-save + +# Extract blog post to default vault +python main.py "https://medium.com/article" --folder "Articles" +``` + +## Configuration + +Create a `.env` file in the project directory to customize settings: + +```bash +cp .env.example .env +``` + +Edit `.env` with your preferences: + +```env +# Obsidian vault path +OBSIDIAN_VAULT_PATH=~/Obsidian Vault + +# Browser settings (for Instagram) +BROWSER_HEADLESS=true +BROWSER_TIMEOUT=30000 + +# Content extraction +MAX_CONTENT_LENGTH=10000 +GENERATE_SUMMARY=true + +# YouTube +YOUTUBE_LANGUAGE=en + +# Instagram +INSTAGRAM_WAIT_TIME=5 +``` + +## Output Format + +Notes are saved in markdown format with: + +- Title and metadata (source, URL, extraction date) +- Author, duration, views (when available) +- Description/summary +- Full content (transcript or article text) +- Key points +- Tags for easy organization + +Example output: + +```markdown +# How to Build AI Agents + +## Metadata +- **Source**: Youtube +- **URL**: https://youtube.com/watch?v=abc123 +- **Extracted**: 2026-02-21 15:30:00 +- **Author**: Tech Channel +- **Duration**: 12:34 +- **Views**: 1.2M + +## Description +Learn how to build AI agents from scratch... + +## Content +[Full transcript or article text...] + +## Key Points +- Point 1 from the content +- Point 2 from the content +- Point 3 from the content + +--- + +## Tags +#youtube #video #ai #agents #notes +``` + +## Troubleshooting + +### Instagram extraction fails +Instagram requires browser automation. Make sure you've run: +```bash +playwright install +``` + +If it still fails, Instagram may have changed their UI. The extractor has a fallback mode that will still extract basic info. + +### YouTube transcript not available +Some videos don't have captions/transcripts. The extractor will fall back to extracting the description only. + +### Obsidian vault not found +By default, the tool looks for `~/Obsidian Vault`. If your vault is elsewhere, use the `--obsidian-path` flag or set `OBSIDIAN_VAULT_PATH` in your `.env` file. + +## Project Structure + +``` +content-extractor/ +ā”œā”€ā”€ main.py # Main entry point +ā”œā”€ā”€ config.py # Configuration settings +ā”œā”€ā”€ obsidian_writer.py # Obsidian note writer +ā”œā”€ā”€ requirements.txt # Python dependencies +ā”œā”€ā”€ .env.example # Example environment file +ā”œā”€ā”€ README.md # This file +└── extractors/ + ā”œā”€ā”€ __init__.py + ā”œā”€ā”€ youtube_extractor.py # YouTube extraction + ā”œā”€ā”€ instagram_extractor.py # Instagram extraction + └── blog_extractor.py # Blog/article extraction +``` + +## Future Enhancements + +- [ ] AI-powered summarization (using LLMs) +- [ ] Podcast/audio extraction (whisper transcription) +- [ ] Twitter/X thread extraction +- [ ] LinkedIn post extraction +- [ ] Batch processing (extract from multiple URLs) +- [ ] Web interface +- [ ] Automatic tagging based on content + +## License + +MIT License - Feel free to use and modify! + +--- + +Built with šŸ”„ by RUBIUS for naki diff --git a/config.py b/config.py new file mode 100644 index 0000000..614672c --- /dev/null +++ b/config.py @@ -0,0 +1,47 @@ +""" +Configuration for Content Extractor +""" + +import os +from pathlib import Path +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + + +class Config: + """Configuration settings for content extractor.""" + + # Obsidian vault path (default to common locations) + OBSIDIAN_VAULT_PATH = os.getenv( + "OBSIDIAN_VAULT_PATH", + os.path.expanduser("~/Obsidian Vault") # Default location + ) + + # Browser settings (for Instagram and dynamic content) + BROWSER_HEADLESS = os.getenv("BROWSER_HEADLESS", "true").lower() == "true" + BROWSER_TIMEOUT = int(os.getenv("BROWSER_TIMEOUT", "30000")) # 30 seconds + + # Content extraction settings + MAX_CONTENT_LENGTH = int(os.getenv("MAX_CONTENT_LENGTH", "10000")) # Max chars + GENERATE_SUMMARY = os.getenv("GENERATE_SUMMARY", "true").lower() == "true" + + # YouTube settings + YOUTUBE_LANGUAGE = os.getenv("YOUTUBE_LANGUAGE", "en") + + # Instagram settings (requires browser automation) + INSTAGRAM_WAIT_TIME = int(os.getenv("INSTAGRAM_WAIT_TIME", "5")) # seconds + + # Logging + LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") + LOG_FILE = os.getenv("LOG_FILE", "content_extractor.log") + + @classmethod + def validate(cls): + """Validate configuration.""" + # Check if Obsidian vault path exists + if not Path(cls.OBSIDIAN_VAULT_PATH).exists(): + print(f"āš ļø Warning: Obsidian vault path does not exist: {cls.OBSIDIAN_VAULT_PATH}") + print(" You can set OBSIDIAN_VAULT_PATH environment variable or use --obsidian-path flag") + return True diff --git a/extract.sh b/extract.sh new file mode 100755 index 0000000..a4dd429 --- /dev/null +++ b/extract.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# Content Extractor - Quick extraction script +# Usage: ./extract.sh [folder] + +if [ -z "$1" ]; then + echo "Usage: $0 [folder]" + echo "" + echo "Examples:" + echo " $0 https://youtube.com/watch?v=abc123" + echo " $0 https://instagram.com/reel/xyz789 Learning" + echo " $0 https://medium.com/article Articles" + exit 1 +fi + +URL="$1" +FOLDER="${2:-Content Extractor}" + +echo "šŸ”„ Content Extractor" +echo "====================" +echo "URL: $URL" +echo "Folder: $FOLDER" +echo "" + +cd "$(dirname "$0")" +python main.py "$URL" --folder "$FOLDER" diff --git a/extractors/__init__.py b/extractors/__init__.py new file mode 100644 index 0000000..b0882f1 --- /dev/null +++ b/extractors/__init__.py @@ -0,0 +1,13 @@ +""" +Content Extractors Package +""" + +from .youtube_extractor import YouTubeExtractor +from .blog_extractor import BlogExtractor +from .instagram_extractor import InstagramExtractor + +__all__ = [ + "YouTubeExtractor", + "BlogExtractor", + "InstagramExtractor", +] diff --git a/extractors/blog_extractor.py b/extractors/blog_extractor.py new file mode 100644 index 0000000..4af4308 --- /dev/null +++ b/extractors/blog_extractor.py @@ -0,0 +1,224 @@ +""" +Blog/Article Extractor + +Extracts: +- Title, author, publish date +- Main article content +- Tags/categories +- Summary +""" + +import re +from typing import Dict, Any, Optional +from urllib.parse import urlparse + +try: + import requests + from bs4 import BeautifulSoup +except ImportError: + requests = None + BeautifulSoup = None + + +class BlogExtractor: + """Extract content from blog posts and articles.""" + + def __init__(self, url: str): + self.url = url + self.html = None + self.soup = None + self._fetch_page() + + def _fetch_page(self): + """Fetch the webpage.""" + if requests is None: + raise ImportError("requests not installed. Run: pip install requests") + + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' + } + + try: + response = requests.get(self.url, headers=headers, timeout=30) + response.raise_for_status() + self.html = response.text + except Exception as e: + raise Exception(f"Failed to fetch page: {str(e)}") + + def _parse_html(self): + """Parse HTML with BeautifulSoup.""" + if BeautifulSoup is None: + raise ImportError("beautifulsoup4 not installed. Run: pip install beautifulsoup4") + + if self.soup is None: + self.soup = BeautifulSoup(self.html, 'lxml') + + def extract(self) -> Dict[str, Any]: + """Extract all content from the page.""" + self._parse_html() + + content = { + "title": self._get_title(), + "description": self._get_description(), + "author": self._get_author(), + "publish_date": self._get_publish_date(), + "content": self._get_content(), + "key_points": self._generate_key_points(), + "tags": self._get_tags(), + } + + return content + + def _get_title(self) -> str: + """Get page title.""" + # Try Open Graph title first + og_title = self.soup.find('meta', property='og:title') + if og_title and og_title.get('content'): + return og_title['content'].strip() + + # Try Twitter card title + twitter_title = self.soup.find('meta', attrs={'name': 'twitter:title'}) + if twitter_title and twitter_title.get('content'): + return twitter_title['content'].strip() + + # Try h1 tag + h1 = self.soup.find('h1') + if h1: + return h1.get_text().strip() + + # Fallback to tag + title_tag = self.soup.find('title') + if title_tag: + return title_tag.get_text().strip() + + return "Untitled Article" + + def _get_description(self) -> str: + """Get page description.""" + # Try Open Graph description + og_desc = self.soup.find('meta', property='og:description') + if og_desc and og_desc.get('content'): + return og_desc['content'].strip() + + # Try meta description + meta_desc = self.soup.find('meta', attrs={'name': 'description'}) + if meta_desc and meta_desc.get('content'): + return meta_desc['content'].strip() + + return "" + + def _get_author(self) -> str: + """Get article author.""" + # Try Open Graph author + og_author = self.soup.find('meta', property='article:author') + if og_author and og_author.get('content'): + return og_author['content'].strip() + + # Try meta author + meta_author = self.soup.find('meta', attrs={'name': 'author'}) + if meta_author and meta_author.get('content'): + return meta_author['content'].strip() + + # Try to find author in byline + byline = self.soup.find(class_=re.compile(r'byline|author', re.I)) + if byline: + return byline.get_text().strip() + + return "Unknown" + + def _get_publish_date(self) -> str: + """Get publish date.""" + # Try Open Graph publish time + og_time = self.soup.find('meta', property='article:published_time') + if og_time and og_time.get('content'): + return og_time['content'][:10] # YYYY-MM-DD + + # Try meta publish date + meta_time = self.soup.find('meta', attrs={'name': 'date'}) + if meta_time and meta_time.get('content'): + return meta_time['content'][:10] + + # Try time tag + time_tag = self.soup.find('time') + if time_tag and time_tag.get('datetime'): + return time_tag['datetime'][:10] + + return "Unknown" + + def _get_content(self) -> str: + """Extract main article content.""" + # Remove unwanted elements + for element in self.soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): + element.decompose() + + # Try to find main content area + content_areas = [ + self.soup.find('article'), + self.soup.find(class_=re.compile(r'article|content|post|entry', re.I)), + self.soup.find(id=re.compile(r'article|content|post', re.I)), + self.soup.find('main'), + ] + + content_elem = next((elem for elem in content_areas if elem), None) + + if content_elem: + # Get paragraphs from content area + paragraphs = content_elem.find_all('p') + else: + # Fallback to all paragraphs + paragraphs = self.soup.find_all('p') + + # Extract text from paragraphs + text_parts = [] + for p in paragraphs: + text = p.get_text().strip() + if len(text) > 50: # Filter out short paragraphs + text_parts.append(text) + + # Join and clean + content = "\n\n".join(text_parts) + content = re.sub(r'\n{3,}', '\n\n', content) # Remove excessive newlines + + return content[:10000] # Limit length + + def _generate_key_points(self) -> list: + """Generate key points from content.""" + content = self._get_content() + + if not content: + return [] + + # Extract first few sentences as key points + sentences = re.split(r'[.!?]+', content) + key_points = [] + + for sentence in sentences[:5]: + sentence = sentence.strip() + if len(sentence) > 30 and len(sentence) < 200: + key_points.append(sentence + '.') + + return key_points + + def _get_tags(self) -> list: + """Get article tags/categories.""" + tags = [] + + # Try Open Graph article tags + og_tags = self.soup.find_all('meta', property='article:tag') + for tag in og_tags: + if tag.get('content'): + tags.append(tag['content'].lower().replace(' ', '-')) + + # Try to find tag elements + tag_elements = self.soup.find_all(class_=re.compile(r'tag|category|label', re.I)) + for elem in tag_elements[:5]: # Limit to 5 + text = elem.get_text().strip().lower() + if len(text) < 30: + tags.append(text.replace(' ', '-')) + + # Add domain-based tag + domain = urlparse(self.url).netloc + if domain: + tags.append(domain.replace('www.', '').split('.')[0]) + + return list(set(tags))[:10] # Remove duplicates and limit diff --git a/extractors/instagram_extractor.py b/extractors/instagram_extractor.py new file mode 100644 index 0000000..c0a110a --- /dev/null +++ b/extractors/instagram_extractor.py @@ -0,0 +1,175 @@ +""" +Instagram Reel Extractor + +Extracts: +- Title/caption +- Author/creator +- Description +- Transcript (if available via captions) +- Metadata (views, likes, etc.) + +Note: Instagram requires browser automation. Uses Playwright. +""" + +import re +import time +from typing import Dict, Any +from urllib.parse import urlparse + +try: + from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout +except ImportError: + sync_playwright = None + + +class InstagramExtractor: + """Extract content from Instagram reels.""" + + def __init__(self, url: str, headless: bool = True): + self.url = url + self.headless = headless + self.data = {} + + if sync_playwright is None: + raise ImportError("playwright not installed. Run: pip install playwright && playwright install") + + def extract(self) -> Dict[str, Any]: + """Extract content from Instagram reel.""" + try: + with sync_playwright() as p: + browser = p.chromium.launch(headless=self.headless) + page = browser.new_page( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + ) + + # Navigate to the reel + print(f"šŸ“± Loading Instagram reel...") + page.goto(self.url, timeout=30000) + + # Wait for content to load + time.sleep(3) + + # Try to close any cookies/login prompts + try: + page.click('button:has-text("Not Now")', timeout=3000) + except: + pass + + try: + page.click('button:has-text("Allow")', timeout=3000) + except: + pass + + # Extract data + self.data = self._extract_data(page) + + browser.close() + except PlaywrightTimeout: + print("āš ļø Timeout loading Instagram page") + self.data = self._fallback_extract() + except Exception as e: + print(f"āš ļø Error: {str(e)}") + self.data = self._fallback_extract() + + return self.data + + def _extract_data(self, page) -> Dict[str, Any]: + """Extract data from loaded page.""" + data = { + "title": "Instagram Reel", + "description": "", + "author": "Unknown", + "content": "", + "key_points": [], + "tags": ["instagram", "reel"], + } + + # Try to get caption/description + try: + # Look for caption text + captions = page.query_selector_all('h1, h2, span') + for caption in captions: + text = caption.inner_text() + if len(text) > 20 and len(text) < 500: + if not data["description"]: + data["description"] = text + break + except Exception as e: + print(f"āš ļø Could not extract caption: {e}") + + # Try to get author + try: + author_elem = page.query_selector('a[href*="/"] h1, a[href*="/"] h2, header span') + if author_elem: + data["author"] = author_elem.inner_text().strip() + except: + pass + + # Try to get engagement metrics + try: + likes_elem = page.query_selector('span:has-text("likes"), span:has-text("views")') + if likes_elem: + data["views"] = likes_elem.inner_text().strip() + except: + pass + + # Extract any visible text as content + try: + # Get all text content + body_text = page.inner_text('body') + + # Filter for meaningful content + lines = body_text.split('\n') + meaningful_lines = [ + line.strip() for line in lines + if len(line.strip()) > 30 and len(line.strip()) < 300 + ] + + data["content"] = "\n\n".join(meaningful_lines[:10])[:5000] + except Exception as e: + print(f"āš ļø Could not extract page text: {e}") + + # Generate key points from description + if data["description"]: + sentences = data["description"].split('.')[:3] + data["key_points"] = [s.strip() + '.' for s in sentences if len(s.strip()) > 20] + + # Add URL-based tags + parsed = urlparse(self.url) + if '/reel/' in parsed.path: + data["tags"].append("reel") + if '/video/' in parsed.path: + data["tags"].append("video") + + return data + + def _fallback_extract(self) -> Dict[str, Any]: + """Fallback extraction when browser automation fails.""" + print("āš ļø Using fallback extraction method...") + + # Try to extract what we can from the URL itself + data = { + "title": "Instagram Content", + "description": "[Could not extract - Instagram requires login]", + "author": "Unknown", + "content": "", + "key_points": [ + "Instagram content extraction requires browser automation", + "Consider using Instagram's official API or downloading the video manually", + ], + "tags": ["instagram", "fallback"], + } + + # Extract reel ID from URL + try: + parsed = urlparse(self.url) + path_parts = parsed.path.split('/') + for i, part in enumerate(path_parts): + if part in ['reel', 'p', 'tv'] and i + 1 < len(path_parts): + reel_id = path_parts[i + 1] + data["key_points"].append(f"Reel ID: {reel_id}") + break + except: + pass + + return data diff --git a/extractors/youtube_extractor.py b/extractors/youtube_extractor.py new file mode 100644 index 0000000..be0a210 --- /dev/null +++ b/extractors/youtube_extractor.py @@ -0,0 +1,203 @@ +""" +YouTube Video Extractor + +Extracts: +- Title, description, author +- Transcript/captions +- Duration, views, publish date +- Tags/categories +""" + +import re +from typing import Optional, Dict, Any +from urllib.parse import urlparse, parse_qs + +try: + from youtube_transcript_api import YouTubeTranscriptApi + from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound +except ImportError: + YouTubeTranscriptApi = None + +try: + from pytubefix import YouTube # More reliable than pytube +except ImportError: + try: + from pytube import YouTube + except ImportError: + YouTube = None + + +class YouTubeExtractor: + """Extract content from YouTube videos.""" + + def __init__(self, url: str): + self.url = url + self.video_id = self._extract_video_id(url) + self.youtube = None + + def _extract_video_id(self, url: str) -> str: + """Extract video ID from YouTube URL.""" + patterns = [ + r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})', + r'youtube\.com\/embed\/([a-zA-Z0-9_-]{11})', + r'youtube\.com\/v\/([a-zA-Z0-9_-]{11})', + ] + + for pattern in patterns: + match = re.search(pattern, url) + if match: + return match.group(1) + + raise ValueError(f"Could not extract YouTube video ID from: {url}") + + def _init_youtube(self): + """Initialize YouTube object.""" + if YouTube is None: + raise ImportError("pytube or pytubefix not installed. Run: pip install pytubefix") + + if self.youtube is None: + self.youtube = YouTube(self.url) + + def extract(self) -> Dict[str, Any]: + """Extract all content from YouTube video.""" + self._init_youtube() + + content = { + "title": self._get_title(), + "description": self._get_description(), + "author": self._get_author(), + "duration": self._get_duration(), + "publish_date": self._get_publish_date(), + "views": self._get_views(), + "content": self._get_transcript(), + "key_points": self._generate_key_points(), + "tags": self._get_tags(), + } + + return content + + def _get_title(self) -> str: + """Get video title.""" + try: + self._init_youtube() + return self.youtube.title + except Exception as e: + return f"Video {self.video_id}" + + def _get_description(self) -> str: + """Get video description.""" + try: + self._init_youtube() + return self.youtube.description or "" + except Exception: + return "" + + def _get_author(self) -> str: + """Get video author/channel name.""" + try: + self._init_youtube() + return self.youtube.author + except Exception: + return "Unknown" + + def _get_duration(self) -> str: + """Get video duration in readable format.""" + try: + self._init_youtube() + seconds = self.youtube.length + minutes, secs = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + + if hours > 0: + return f"{hours}:{minutes:02d}:{secs:02d}" + else: + return f"{minutes}:{secs:02d}" + except Exception: + return "Unknown" + + def _get_publish_date(self) -> str: + """Get video publish date.""" + try: + self._init_youtube() + if hasattr(self.youtube, 'publish_date') and self.youtube.publish_date: + return self.youtube.publish_date.strftime("%Y-%m-%d") + except Exception: + pass + return "Unknown" + + def _get_views(self) -> str: + """Get view count.""" + try: + self._init_youtube() + views = self.youtube.views + if views > 1_000_000: + return f"{views / 1_000_000:.1f}M" + elif views > 1_000: + return f"{views / 1_000:.1f}K" + else: + return str(views) + except Exception: + return "Unknown" + + def _get_transcript(self) -> str: + """Get video transcript/captions.""" + if YouTubeTranscriptApi is None: + return "[Transcript not available - youtube-transcript-api not installed]" + + try: + # New API requires creating an instance + api = YouTubeTranscriptApi() + transcript_list = api.list(self.video_id) + + # Try to find English transcript + transcript = None + for t in transcript_list: + if t.language_code == 'en': + transcript = t + break + + # Fallback to first available + if transcript is None: + transcript = next(iter(transcript_list), None) + + if transcript is None: + return "[No transcript available]" + + transcript_data = transcript.fetch() + + # New API returns FetchedTranscript with snippets + if hasattr(transcript_data, 'snippets'): + full_text = " ".join([snippet.text for snippet in transcript_data.snippets]) + else: + # Fallback for older API format + full_text = " ".join([entry['text'] for entry in transcript_data]) + + # Clean up the text + full_text = full_text.replace("\n", " ").strip() + + return full_text[:10000] # Limit length + except Exception as e: + return f"[Transcript not available: {str(e)}]" + + def _generate_key_points(self) -> list: + """Generate key points from transcript (simple extraction).""" + transcript = self._get_transcript() + + if not transcript or transcript.startswith("["): + return [] + + # Simple sentence extraction (first few sentences as key points) + sentences = transcript.split('.')[:5] + key_points = [s.strip() + '.' for s in sentences if len(s.strip()) > 20] + + return key_points[:5] + + def _get_tags(self) -> list: + """Get video tags.""" + try: + self._init_youtube() + if hasattr(self.youtube, 'keywords'): + return self.youtube.keywords[:10] if self.youtube.keywords else [] + except Exception: + pass + return ["youtube", "video"] diff --git a/main.py b/main.py new file mode 100644 index 0000000..986fe17 --- /dev/null +++ b/main.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +Content Extractor - Extract key information from URLs and save to Obsidian + +Supports: +- YouTube videos (transcripts, descriptions, metadata) +- Blog posts & articles (web scraping) +- Instagram reels (via browser automation) +- Generic URLs (text extraction) + +Usage: + python main.py <url> [--obsidian-path <path>] [--output <filename>] +""" + +import argparse +import sys +from pathlib import Path +from datetime import datetime +from typing import Optional + +from extractors.youtube_extractor import YouTubeExtractor +from extractors.blog_extractor import BlogExtractor +from extractors.instagram_extractor import InstagramExtractor +from obsidian_writer import ObsidianWriter +from config import Config + + +def detect_source_type(url: str) -> str: + """Detect the type of content based on URL.""" + if "youtube.com" in url or "youtu.be" in url: + return "youtube" + elif "instagram.com" in url and "/reel" in url: + return "instagram" + elif "instagram.com" in url: + return "instagram" + else: + return "blog" + + +def extract_content(url: str, source_type: str) -> dict: + """Extract content from URL based on source type.""" + print(f"šŸ” Extracting content from {source_type}...") + + if source_type == "youtube": + extractor = YouTubeExtractor(url) + elif source_type == "instagram": + extractor = InstagramExtractor(url) + else: + extractor = BlogExtractor(url) + + return extractor.extract() + + +def main(): + parser = argparse.ArgumentParser( + description="Extract content from URLs and save to Obsidian notes" + ) + parser.add_argument("url", help="URL to extract content from") + parser.add_argument( + "--obsidian-path", + type=str, + default=Config.OBSIDIAN_VAULT_PATH, + help="Path to Obsidian vault" + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Output filename (without .md extension)" + ) + parser.add_argument( + "--folder", + type=str, + default="Content Extractor", + help="Folder in Obsidian vault to save notes" + ) + parser.add_argument( + "--no-save", + action="store_true", + help="Only print extracted content, don't save to Obsidian" + ) + parser.add_argument( + "--summarize", + action="store_true", + help="Generate a summary of the content" + ) + + args = parser.parse_args() + + # Detect source type + source_type = detect_source_type(args.url) + print(f"šŸ“Œ Detected source type: {source_type}") + + # Extract content + try: + content = extract_content(args.url, source_type) + except Exception as e: + print(f"āŒ Extraction failed: {e}") + sys.exit(1) + + if not content: + print("āŒ No content could be extracted") + sys.exit(1) + + # Generate output filename + if args.output: + filename = args.output + else: + # Generate from title or URL + title = content.get("title", "Untitled") + filename = f"{title[:50]}_{datetime.now().strftime('%Y%m%d')}" + # Sanitize filename + filename = "".join(c for c in filename if c.isalnum() or c in " -_").strip() + + # Create markdown content + markdown = generate_markdown(content, source_type, args.url) + + # Print preview + print("\n" + "="*80) + print("šŸ“ EXTRACTED CONTENT PREVIEW") + print("="*80) + print(markdown[:2000] + "..." if len(markdown) > 2000 else markdown) + print("="*80) + + # Save to Obsidian + if not args.no_save: + writer = ObsidianWriter(args.obsidian_path) + output_path = writer.save_note(markdown, filename, args.folder) + print(f"\nāœ… Note saved to: {output_path}") + else: + print("\nāš ļø Note not saved (--no-save flag)") + + return content + + +def generate_markdown(content: dict, source_type: str, url: str) -> str: + """Generate markdown content for Obsidian note.""" + lines = [] + + # Header + lines.append(f"# {content.get('title', 'Untitled')}") + lines.append("") + + # Metadata + lines.append("## Metadata") + lines.append("") + lines.append(f"- **Source**: {source_type.capitalize()}") + lines.append(f"- **URL**: {url}") + lines.append(f"- **Extracted**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + if content.get("author"): + lines.append(f"- **Author**: {content.get('author')}") + if content.get("duration"): + lines.append(f"- **Duration**: {content.get('duration')}") + if content.get("publish_date"): + lines.append(f"- **Published**: {content.get('publish_date')}") + if content.get("views"): + lines.append(f"- **Views**: {content.get('views')}") + + lines.append("") + + # Description/Summary + if content.get("description"): + lines.append("## Description") + lines.append("") + lines.append(content.get("description", "")) + lines.append("") + + # Main Content (transcript, article text, etc.) + if content.get("content"): + lines.append("## Content") + lines.append("") + lines.append(content.get("content", "")) + lines.append("") + + # Key Points/Summary + if content.get("key_points"): + lines.append("## Key Points") + lines.append("") + for point in content.get("key_points", []): + lines.append(f"- {point}") + lines.append("") + + # Tags + lines.append("---") + lines.append("") + lines.append("## Tags") + lines.append("") + tags = content.get("tags", []) + if not tags: + tags = ["content-extractor", source_type, "notes"] + lines.append(" ".join(f"#{tag}" for tag in tags)) + lines.append("") + + return "\n".join(lines) + + +if __name__ == "__main__": + main() diff --git a/obsidian_writer.py b/obsidian_writer.py new file mode 100644 index 0000000..e073a4c --- /dev/null +++ b/obsidian_writer.py @@ -0,0 +1,128 @@ +""" +Obsidian Note Writer + +Saves extracted content as markdown notes in Obsidian vault. +""" + +import os +from pathlib import Path +from datetime import datetime +from typing import Optional + + +class ObsidianWriter: + """Write content to Obsidian vault as markdown notes.""" + + def __init__(self, vault_path: str): + self.vault_path = Path(vault_path).expanduser() + self._validate_vault() + + def _validate_vault(self): + """Validate that the path is an Obsidian vault.""" + if not self.vault_path.exists(): + print(f"āš ļø Creating Obsidian vault directory: {self.vault_path}") + self.vault_path.mkdir(parents=True, exist_ok=True) + + # Check if it looks like an Obsidian vault + obsidian_config = self.vault_path / ".obsidian" + if not obsidian_config.exists(): + print(f"āš ļø Warning: {self.vault_path} doesn't look like an Obsidian vault") + print(" (No .obsidian directory found)") + print(" Notes will still be saved, but you may want to set the correct vault path") + + def save_note( + self, + content: str, + filename: str, + folder: Optional[str] = None, + subfolder: Optional[str] = None + ) -> Path: + """ + Save a note to Obsidian vault. + + Args: + content: Markdown content to save + filename: Filename without .md extension + folder: Folder in vault (default: root) + subfolder: Subfolder within folder (optional) + + Returns: + Path to saved file + """ + # Build path + if folder: + note_dir = self.vault_path / folder + if subfolder: + note_dir = note_dir / subfolder + else: + note_dir = self.vault_path + + # Create directory if it doesn't exist + note_dir.mkdir(parents=True, exist_ok=True) + + # Sanitize filename + filename = self._sanitize_filename(filename) + + # Add .md extension + filepath = note_dir / f"{filename}.md" + + # Handle duplicate filenames + counter = 1 + original_filepath = filepath + while filepath.exists(): + filepath = original_filepath.with_name(f"{filename}_{counter}.md") + counter += 1 + + # Write the file + try: + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + print(f"āœ… Note saved: {filepath.name}") + return filepath + except Exception as e: + raise Exception(f"Failed to save note: {str(e)}") + + def _sanitize_filename(self, filename: str) -> str: + """Sanitize filename for filesystem.""" + # Remove invalid characters + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + filename = filename.replace(char, '') + + # Replace spaces with hyphens (optional, but cleaner) + # filename = filename.replace(' ', '-') + + # Limit length + if len(filename) > 100: + filename = filename[:100] + + return filename.strip() + + def create_daily_note(self, content: str) -> Path: + """Create/update a daily note.""" + today = datetime.now().strftime("%Y-%m-%d") + folder = "Daily Notes" + return self.save_note(content, today, folder) + + def append_to_note(self, filename: str, content: str, folder: Optional[str] = None) -> Path: + """Append content to an existing note.""" + if folder: + note_dir = self.vault_path / folder + else: + note_dir = self.vault_path + + filepath = note_dir / f"{filename}.md" + + # If file doesn't exist, create it + if not filepath.exists(): + return self.save_note(content, filename, folder) + + # Append to existing file + try: + with open(filepath, 'a', encoding='utf-8') as f: + f.write("\n\n---\n\n") + f.write(content) + print(f"āœ… Content appended to: {filepath.name}") + return filepath + except Exception as e: + raise Exception(f"Failed to append to note: {str(e)}") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5dc463e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +# Content Extractor Dependencies + +# Web scraping +requests>=2.31.0 +beautifulsoup4>=4.12.0 +lxml>=4.9.0 + +# YouTube +youtube-transcript-api>=0.6.0 +pytube>=15.0.0 + +# Browser automation (for Instagram and dynamic content) +playwright>=1.40.0 + +# Text processing +markdown>=3.5.0 + +# Utilities +python-dotenv>=1.0.0 +pydantic>=2.5.0 + +# Date handling +python-dateutil>=2.8.0