feat: Initial commit - Content Extractor for YouTube, Instagram, and blogs
- YouTube extraction with transcript support - Instagram reel extraction via browser automation - Blog/article web scraping - Auto-save to Obsidian vaults - Smart key point generation - Configurable via .env file - Quick extract shell script Tech stack: Python, requests, beautifulsoup4, playwright, youtube-transcript-api
This commit is contained in:
13
extractors/__init__.py
Normal file
13
extractors/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""
|
||||
Content Extractors Package
|
||||
"""
|
||||
|
||||
from .youtube_extractor import YouTubeExtractor
|
||||
from .blog_extractor import BlogExtractor
|
||||
from .instagram_extractor import InstagramExtractor
|
||||
|
||||
__all__ = [
|
||||
"YouTubeExtractor",
|
||||
"BlogExtractor",
|
||||
"InstagramExtractor",
|
||||
]
|
||||
224
extractors/blog_extractor.py
Normal file
224
extractors/blog_extractor.py
Normal file
@@ -0,0 +1,224 @@
|
||||
"""
|
||||
Blog/Article Extractor
|
||||
|
||||
Extracts:
|
||||
- Title, author, publish date
|
||||
- Main article content
|
||||
- Tags/categories
|
||||
- Summary
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Dict, Any, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
try:
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
except ImportError:
|
||||
requests = None
|
||||
BeautifulSoup = None
|
||||
|
||||
|
||||
class BlogExtractor:
|
||||
"""Extract content from blog posts and articles."""
|
||||
|
||||
def __init__(self, url: str):
|
||||
self.url = url
|
||||
self.html = None
|
||||
self.soup = None
|
||||
self._fetch_page()
|
||||
|
||||
def _fetch_page(self):
|
||||
"""Fetch the webpage."""
|
||||
if requests is None:
|
||||
raise ImportError("requests not installed. Run: pip install requests")
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(self.url, headers=headers, timeout=30)
|
||||
response.raise_for_status()
|
||||
self.html = response.text
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to fetch page: {str(e)}")
|
||||
|
||||
def _parse_html(self):
|
||||
"""Parse HTML with BeautifulSoup."""
|
||||
if BeautifulSoup is None:
|
||||
raise ImportError("beautifulsoup4 not installed. Run: pip install beautifulsoup4")
|
||||
|
||||
if self.soup is None:
|
||||
self.soup = BeautifulSoup(self.html, 'lxml')
|
||||
|
||||
def extract(self) -> Dict[str, Any]:
|
||||
"""Extract all content from the page."""
|
||||
self._parse_html()
|
||||
|
||||
content = {
|
||||
"title": self._get_title(),
|
||||
"description": self._get_description(),
|
||||
"author": self._get_author(),
|
||||
"publish_date": self._get_publish_date(),
|
||||
"content": self._get_content(),
|
||||
"key_points": self._generate_key_points(),
|
||||
"tags": self._get_tags(),
|
||||
}
|
||||
|
||||
return content
|
||||
|
||||
def _get_title(self) -> str:
|
||||
"""Get page title."""
|
||||
# Try Open Graph title first
|
||||
og_title = self.soup.find('meta', property='og:title')
|
||||
if og_title and og_title.get('content'):
|
||||
return og_title['content'].strip()
|
||||
|
||||
# Try Twitter card title
|
||||
twitter_title = self.soup.find('meta', attrs={'name': 'twitter:title'})
|
||||
if twitter_title and twitter_title.get('content'):
|
||||
return twitter_title['content'].strip()
|
||||
|
||||
# Try h1 tag
|
||||
h1 = self.soup.find('h1')
|
||||
if h1:
|
||||
return h1.get_text().strip()
|
||||
|
||||
# Fallback to <title> tag
|
||||
title_tag = self.soup.find('title')
|
||||
if title_tag:
|
||||
return title_tag.get_text().strip()
|
||||
|
||||
return "Untitled Article"
|
||||
|
||||
def _get_description(self) -> str:
|
||||
"""Get page description."""
|
||||
# Try Open Graph description
|
||||
og_desc = self.soup.find('meta', property='og:description')
|
||||
if og_desc and og_desc.get('content'):
|
||||
return og_desc['content'].strip()
|
||||
|
||||
# Try meta description
|
||||
meta_desc = self.soup.find('meta', attrs={'name': 'description'})
|
||||
if meta_desc and meta_desc.get('content'):
|
||||
return meta_desc['content'].strip()
|
||||
|
||||
return ""
|
||||
|
||||
def _get_author(self) -> str:
|
||||
"""Get article author."""
|
||||
# Try Open Graph author
|
||||
og_author = self.soup.find('meta', property='article:author')
|
||||
if og_author and og_author.get('content'):
|
||||
return og_author['content'].strip()
|
||||
|
||||
# Try meta author
|
||||
meta_author = self.soup.find('meta', attrs={'name': 'author'})
|
||||
if meta_author and meta_author.get('content'):
|
||||
return meta_author['content'].strip()
|
||||
|
||||
# Try to find author in byline
|
||||
byline = self.soup.find(class_=re.compile(r'byline|author', re.I))
|
||||
if byline:
|
||||
return byline.get_text().strip()
|
||||
|
||||
return "Unknown"
|
||||
|
||||
def _get_publish_date(self) -> str:
|
||||
"""Get publish date."""
|
||||
# Try Open Graph publish time
|
||||
og_time = self.soup.find('meta', property='article:published_time')
|
||||
if og_time and og_time.get('content'):
|
||||
return og_time['content'][:10] # YYYY-MM-DD
|
||||
|
||||
# Try meta publish date
|
||||
meta_time = self.soup.find('meta', attrs={'name': 'date'})
|
||||
if meta_time and meta_time.get('content'):
|
||||
return meta_time['content'][:10]
|
||||
|
||||
# Try time tag
|
||||
time_tag = self.soup.find('time')
|
||||
if time_tag and time_tag.get('datetime'):
|
||||
return time_tag['datetime'][:10]
|
||||
|
||||
return "Unknown"
|
||||
|
||||
def _get_content(self) -> str:
|
||||
"""Extract main article content."""
|
||||
# Remove unwanted elements
|
||||
for element in self.soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
|
||||
element.decompose()
|
||||
|
||||
# Try to find main content area
|
||||
content_areas = [
|
||||
self.soup.find('article'),
|
||||
self.soup.find(class_=re.compile(r'article|content|post|entry', re.I)),
|
||||
self.soup.find(id=re.compile(r'article|content|post', re.I)),
|
||||
self.soup.find('main'),
|
||||
]
|
||||
|
||||
content_elem = next((elem for elem in content_areas if elem), None)
|
||||
|
||||
if content_elem:
|
||||
# Get paragraphs from content area
|
||||
paragraphs = content_elem.find_all('p')
|
||||
else:
|
||||
# Fallback to all paragraphs
|
||||
paragraphs = self.soup.find_all('p')
|
||||
|
||||
# Extract text from paragraphs
|
||||
text_parts = []
|
||||
for p in paragraphs:
|
||||
text = p.get_text().strip()
|
||||
if len(text) > 50: # Filter out short paragraphs
|
||||
text_parts.append(text)
|
||||
|
||||
# Join and clean
|
||||
content = "\n\n".join(text_parts)
|
||||
content = re.sub(r'\n{3,}', '\n\n', content) # Remove excessive newlines
|
||||
|
||||
return content[:10000] # Limit length
|
||||
|
||||
def _generate_key_points(self) -> list:
|
||||
"""Generate key points from content."""
|
||||
content = self._get_content()
|
||||
|
||||
if not content:
|
||||
return []
|
||||
|
||||
# Extract first few sentences as key points
|
||||
sentences = re.split(r'[.!?]+', content)
|
||||
key_points = []
|
||||
|
||||
for sentence in sentences[:5]:
|
||||
sentence = sentence.strip()
|
||||
if len(sentence) > 30 and len(sentence) < 200:
|
||||
key_points.append(sentence + '.')
|
||||
|
||||
return key_points
|
||||
|
||||
def _get_tags(self) -> list:
|
||||
"""Get article tags/categories."""
|
||||
tags = []
|
||||
|
||||
# Try Open Graph article tags
|
||||
og_tags = self.soup.find_all('meta', property='article:tag')
|
||||
for tag in og_tags:
|
||||
if tag.get('content'):
|
||||
tags.append(tag['content'].lower().replace(' ', '-'))
|
||||
|
||||
# Try to find tag elements
|
||||
tag_elements = self.soup.find_all(class_=re.compile(r'tag|category|label', re.I))
|
||||
for elem in tag_elements[:5]: # Limit to 5
|
||||
text = elem.get_text().strip().lower()
|
||||
if len(text) < 30:
|
||||
tags.append(text.replace(' ', '-'))
|
||||
|
||||
# Add domain-based tag
|
||||
domain = urlparse(self.url).netloc
|
||||
if domain:
|
||||
tags.append(domain.replace('www.', '').split('.')[0])
|
||||
|
||||
return list(set(tags))[:10] # Remove duplicates and limit
|
||||
175
extractors/instagram_extractor.py
Normal file
175
extractors/instagram_extractor.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""
|
||||
Instagram Reel Extractor
|
||||
|
||||
Extracts:
|
||||
- Title/caption
|
||||
- Author/creator
|
||||
- Description
|
||||
- Transcript (if available via captions)
|
||||
- Metadata (views, likes, etc.)
|
||||
|
||||
Note: Instagram requires browser automation. Uses Playwright.
|
||||
"""
|
||||
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
try:
|
||||
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
|
||||
except ImportError:
|
||||
sync_playwright = None
|
||||
|
||||
|
||||
class InstagramExtractor:
|
||||
"""Extract content from Instagram reels."""
|
||||
|
||||
def __init__(self, url: str, headless: bool = True):
|
||||
self.url = url
|
||||
self.headless = headless
|
||||
self.data = {}
|
||||
|
||||
if sync_playwright is None:
|
||||
raise ImportError("playwright not installed. Run: pip install playwright && playwright install")
|
||||
|
||||
def extract(self) -> Dict[str, Any]:
|
||||
"""Extract content from Instagram reel."""
|
||||
try:
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=self.headless)
|
||||
page = browser.new_page(
|
||||
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
)
|
||||
|
||||
# Navigate to the reel
|
||||
print(f"📱 Loading Instagram reel...")
|
||||
page.goto(self.url, timeout=30000)
|
||||
|
||||
# Wait for content to load
|
||||
time.sleep(3)
|
||||
|
||||
# Try to close any cookies/login prompts
|
||||
try:
|
||||
page.click('button:has-text("Not Now")', timeout=3000)
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
page.click('button:has-text("Allow")', timeout=3000)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Extract data
|
||||
self.data = self._extract_data(page)
|
||||
|
||||
browser.close()
|
||||
except PlaywrightTimeout:
|
||||
print("⚠️ Timeout loading Instagram page")
|
||||
self.data = self._fallback_extract()
|
||||
except Exception as e:
|
||||
print(f"⚠️ Error: {str(e)}")
|
||||
self.data = self._fallback_extract()
|
||||
|
||||
return self.data
|
||||
|
||||
def _extract_data(self, page) -> Dict[str, Any]:
|
||||
"""Extract data from loaded page."""
|
||||
data = {
|
||||
"title": "Instagram Reel",
|
||||
"description": "",
|
||||
"author": "Unknown",
|
||||
"content": "",
|
||||
"key_points": [],
|
||||
"tags": ["instagram", "reel"],
|
||||
}
|
||||
|
||||
# Try to get caption/description
|
||||
try:
|
||||
# Look for caption text
|
||||
captions = page.query_selector_all('h1, h2, span')
|
||||
for caption in captions:
|
||||
text = caption.inner_text()
|
||||
if len(text) > 20 and len(text) < 500:
|
||||
if not data["description"]:
|
||||
data["description"] = text
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not extract caption: {e}")
|
||||
|
||||
# Try to get author
|
||||
try:
|
||||
author_elem = page.query_selector('a[href*="/"] h1, a[href*="/"] h2, header span')
|
||||
if author_elem:
|
||||
data["author"] = author_elem.inner_text().strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Try to get engagement metrics
|
||||
try:
|
||||
likes_elem = page.query_selector('span:has-text("likes"), span:has-text("views")')
|
||||
if likes_elem:
|
||||
data["views"] = likes_elem.inner_text().strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Extract any visible text as content
|
||||
try:
|
||||
# Get all text content
|
||||
body_text = page.inner_text('body')
|
||||
|
||||
# Filter for meaningful content
|
||||
lines = body_text.split('\n')
|
||||
meaningful_lines = [
|
||||
line.strip() for line in lines
|
||||
if len(line.strip()) > 30 and len(line.strip()) < 300
|
||||
]
|
||||
|
||||
data["content"] = "\n\n".join(meaningful_lines[:10])[:5000]
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not extract page text: {e}")
|
||||
|
||||
# Generate key points from description
|
||||
if data["description"]:
|
||||
sentences = data["description"].split('.')[:3]
|
||||
data["key_points"] = [s.strip() + '.' for s in sentences if len(s.strip()) > 20]
|
||||
|
||||
# Add URL-based tags
|
||||
parsed = urlparse(self.url)
|
||||
if '/reel/' in parsed.path:
|
||||
data["tags"].append("reel")
|
||||
if '/video/' in parsed.path:
|
||||
data["tags"].append("video")
|
||||
|
||||
return data
|
||||
|
||||
def _fallback_extract(self) -> Dict[str, Any]:
|
||||
"""Fallback extraction when browser automation fails."""
|
||||
print("⚠️ Using fallback extraction method...")
|
||||
|
||||
# Try to extract what we can from the URL itself
|
||||
data = {
|
||||
"title": "Instagram Content",
|
||||
"description": "[Could not extract - Instagram requires login]",
|
||||
"author": "Unknown",
|
||||
"content": "",
|
||||
"key_points": [
|
||||
"Instagram content extraction requires browser automation",
|
||||
"Consider using Instagram's official API or downloading the video manually",
|
||||
],
|
||||
"tags": ["instagram", "fallback"],
|
||||
}
|
||||
|
||||
# Extract reel ID from URL
|
||||
try:
|
||||
parsed = urlparse(self.url)
|
||||
path_parts = parsed.path.split('/')
|
||||
for i, part in enumerate(path_parts):
|
||||
if part in ['reel', 'p', 'tv'] and i + 1 < len(path_parts):
|
||||
reel_id = path_parts[i + 1]
|
||||
data["key_points"].append(f"Reel ID: {reel_id}")
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
return data
|
||||
203
extractors/youtube_extractor.py
Normal file
203
extractors/youtube_extractor.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""
|
||||
YouTube Video Extractor
|
||||
|
||||
Extracts:
|
||||
- Title, description, author
|
||||
- Transcript/captions
|
||||
- Duration, views, publish date
|
||||
- Tags/categories
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Optional, Dict, Any
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
try:
|
||||
from youtube_transcript_api import YouTubeTranscriptApi
|
||||
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
|
||||
except ImportError:
|
||||
YouTubeTranscriptApi = None
|
||||
|
||||
try:
|
||||
from pytubefix import YouTube # More reliable than pytube
|
||||
except ImportError:
|
||||
try:
|
||||
from pytube import YouTube
|
||||
except ImportError:
|
||||
YouTube = None
|
||||
|
||||
|
||||
class YouTubeExtractor:
|
||||
"""Extract content from YouTube videos."""
|
||||
|
||||
def __init__(self, url: str):
|
||||
self.url = url
|
||||
self.video_id = self._extract_video_id(url)
|
||||
self.youtube = None
|
||||
|
||||
def _extract_video_id(self, url: str) -> str:
|
||||
"""Extract video ID from YouTube URL."""
|
||||
patterns = [
|
||||
r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})',
|
||||
r'youtube\.com\/embed\/([a-zA-Z0-9_-]{11})',
|
||||
r'youtube\.com\/v\/([a-zA-Z0-9_-]{11})',
|
||||
]
|
||||
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, url)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
raise ValueError(f"Could not extract YouTube video ID from: {url}")
|
||||
|
||||
def _init_youtube(self):
|
||||
"""Initialize YouTube object."""
|
||||
if YouTube is None:
|
||||
raise ImportError("pytube or pytubefix not installed. Run: pip install pytubefix")
|
||||
|
||||
if self.youtube is None:
|
||||
self.youtube = YouTube(self.url)
|
||||
|
||||
def extract(self) -> Dict[str, Any]:
|
||||
"""Extract all content from YouTube video."""
|
||||
self._init_youtube()
|
||||
|
||||
content = {
|
||||
"title": self._get_title(),
|
||||
"description": self._get_description(),
|
||||
"author": self._get_author(),
|
||||
"duration": self._get_duration(),
|
||||
"publish_date": self._get_publish_date(),
|
||||
"views": self._get_views(),
|
||||
"content": self._get_transcript(),
|
||||
"key_points": self._generate_key_points(),
|
||||
"tags": self._get_tags(),
|
||||
}
|
||||
|
||||
return content
|
||||
|
||||
def _get_title(self) -> str:
|
||||
"""Get video title."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
return self.youtube.title
|
||||
except Exception as e:
|
||||
return f"Video {self.video_id}"
|
||||
|
||||
def _get_description(self) -> str:
|
||||
"""Get video description."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
return self.youtube.description or ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def _get_author(self) -> str:
|
||||
"""Get video author/channel name."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
return self.youtube.author
|
||||
except Exception:
|
||||
return "Unknown"
|
||||
|
||||
def _get_duration(self) -> str:
|
||||
"""Get video duration in readable format."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
seconds = self.youtube.length
|
||||
minutes, secs = divmod(seconds, 60)
|
||||
hours, minutes = divmod(minutes, 60)
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours}:{minutes:02d}:{secs:02d}"
|
||||
else:
|
||||
return f"{minutes}:{secs:02d}"
|
||||
except Exception:
|
||||
return "Unknown"
|
||||
|
||||
def _get_publish_date(self) -> str:
|
||||
"""Get video publish date."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
if hasattr(self.youtube, 'publish_date') and self.youtube.publish_date:
|
||||
return self.youtube.publish_date.strftime("%Y-%m-%d")
|
||||
except Exception:
|
||||
pass
|
||||
return "Unknown"
|
||||
|
||||
def _get_views(self) -> str:
|
||||
"""Get view count."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
views = self.youtube.views
|
||||
if views > 1_000_000:
|
||||
return f"{views / 1_000_000:.1f}M"
|
||||
elif views > 1_000:
|
||||
return f"{views / 1_000:.1f}K"
|
||||
else:
|
||||
return str(views)
|
||||
except Exception:
|
||||
return "Unknown"
|
||||
|
||||
def _get_transcript(self) -> str:
|
||||
"""Get video transcript/captions."""
|
||||
if YouTubeTranscriptApi is None:
|
||||
return "[Transcript not available - youtube-transcript-api not installed]"
|
||||
|
||||
try:
|
||||
# New API requires creating an instance
|
||||
api = YouTubeTranscriptApi()
|
||||
transcript_list = api.list(self.video_id)
|
||||
|
||||
# Try to find English transcript
|
||||
transcript = None
|
||||
for t in transcript_list:
|
||||
if t.language_code == 'en':
|
||||
transcript = t
|
||||
break
|
||||
|
||||
# Fallback to first available
|
||||
if transcript is None:
|
||||
transcript = next(iter(transcript_list), None)
|
||||
|
||||
if transcript is None:
|
||||
return "[No transcript available]"
|
||||
|
||||
transcript_data = transcript.fetch()
|
||||
|
||||
# New API returns FetchedTranscript with snippets
|
||||
if hasattr(transcript_data, 'snippets'):
|
||||
full_text = " ".join([snippet.text for snippet in transcript_data.snippets])
|
||||
else:
|
||||
# Fallback for older API format
|
||||
full_text = " ".join([entry['text'] for entry in transcript_data])
|
||||
|
||||
# Clean up the text
|
||||
full_text = full_text.replace("\n", " ").strip()
|
||||
|
||||
return full_text[:10000] # Limit length
|
||||
except Exception as e:
|
||||
return f"[Transcript not available: {str(e)}]"
|
||||
|
||||
def _generate_key_points(self) -> list:
|
||||
"""Generate key points from transcript (simple extraction)."""
|
||||
transcript = self._get_transcript()
|
||||
|
||||
if not transcript or transcript.startswith("["):
|
||||
return []
|
||||
|
||||
# Simple sentence extraction (first few sentences as key points)
|
||||
sentences = transcript.split('.')[:5]
|
||||
key_points = [s.strip() + '.' for s in sentences if len(s.strip()) > 20]
|
||||
|
||||
return key_points[:5]
|
||||
|
||||
def _get_tags(self) -> list:
|
||||
"""Get video tags."""
|
||||
try:
|
||||
self._init_youtube()
|
||||
if hasattr(self.youtube, 'keywords'):
|
||||
return self.youtube.keywords[:10] if self.youtube.keywords else []
|
||||
except Exception:
|
||||
pass
|
||||
return ["youtube", "video"]
|
||||
Reference in New Issue
Block a user