Files
backblaze-invoices-downloader/extractors/youtube_extractor.py
naki c997e764b5 feat: Initial commit - Content Extractor for YouTube, Instagram, and blogs
- YouTube extraction with transcript support
- Instagram reel extraction via browser automation
- Blog/article web scraping
- Auto-save to Obsidian vaults
- Smart key point generation
- Configurable via .env file
- Quick extract shell script

Tech stack: Python, requests, beautifulsoup4, playwright, youtube-transcript-api
2026-03-05 13:02:58 +05:30

204 lines
6.5 KiB
Python

"""
YouTube Video Extractor
Extracts:
- Title, description, author
- Transcript/captions
- Duration, views, publish date
- Tags/categories
"""
import re
from typing import Optional, Dict, Any
from urllib.parse import urlparse, parse_qs
try:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
except ImportError:
YouTubeTranscriptApi = None
try:
from pytubefix import YouTube # More reliable than pytube
except ImportError:
try:
from pytube import YouTube
except ImportError:
YouTube = None
class YouTubeExtractor:
"""Extract content from YouTube videos."""
def __init__(self, url: str):
self.url = url
self.video_id = self._extract_video_id(url)
self.youtube = None
def _extract_video_id(self, url: str) -> str:
"""Extract video ID from YouTube URL."""
patterns = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([a-zA-Z0-9_-]{11})',
r'youtube\.com\/embed\/([a-zA-Z0-9_-]{11})',
r'youtube\.com\/v\/([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError(f"Could not extract YouTube video ID from: {url}")
def _init_youtube(self):
"""Initialize YouTube object."""
if YouTube is None:
raise ImportError("pytube or pytubefix not installed. Run: pip install pytubefix")
if self.youtube is None:
self.youtube = YouTube(self.url)
def extract(self) -> Dict[str, Any]:
"""Extract all content from YouTube video."""
self._init_youtube()
content = {
"title": self._get_title(),
"description": self._get_description(),
"author": self._get_author(),
"duration": self._get_duration(),
"publish_date": self._get_publish_date(),
"views": self._get_views(),
"content": self._get_transcript(),
"key_points": self._generate_key_points(),
"tags": self._get_tags(),
}
return content
def _get_title(self) -> str:
"""Get video title."""
try:
self._init_youtube()
return self.youtube.title
except Exception as e:
return f"Video {self.video_id}"
def _get_description(self) -> str:
"""Get video description."""
try:
self._init_youtube()
return self.youtube.description or ""
except Exception:
return ""
def _get_author(self) -> str:
"""Get video author/channel name."""
try:
self._init_youtube()
return self.youtube.author
except Exception:
return "Unknown"
def _get_duration(self) -> str:
"""Get video duration in readable format."""
try:
self._init_youtube()
seconds = self.youtube.length
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes}:{secs:02d}"
except Exception:
return "Unknown"
def _get_publish_date(self) -> str:
"""Get video publish date."""
try:
self._init_youtube()
if hasattr(self.youtube, 'publish_date') and self.youtube.publish_date:
return self.youtube.publish_date.strftime("%Y-%m-%d")
except Exception:
pass
return "Unknown"
def _get_views(self) -> str:
"""Get view count."""
try:
self._init_youtube()
views = self.youtube.views
if views > 1_000_000:
return f"{views / 1_000_000:.1f}M"
elif views > 1_000:
return f"{views / 1_000:.1f}K"
else:
return str(views)
except Exception:
return "Unknown"
def _get_transcript(self) -> str:
"""Get video transcript/captions."""
if YouTubeTranscriptApi is None:
return "[Transcript not available - youtube-transcript-api not installed]"
try:
# New API requires creating an instance
api = YouTubeTranscriptApi()
transcript_list = api.list(self.video_id)
# Try to find English transcript
transcript = None
for t in transcript_list:
if t.language_code == 'en':
transcript = t
break
# Fallback to first available
if transcript is None:
transcript = next(iter(transcript_list), None)
if transcript is None:
return "[No transcript available]"
transcript_data = transcript.fetch()
# New API returns FetchedTranscript with snippets
if hasattr(transcript_data, 'snippets'):
full_text = " ".join([snippet.text for snippet in transcript_data.snippets])
else:
# Fallback for older API format
full_text = " ".join([entry['text'] for entry in transcript_data])
# Clean up the text
full_text = full_text.replace("\n", " ").strip()
return full_text[:10000] # Limit length
except Exception as e:
return f"[Transcript not available: {str(e)}]"
def _generate_key_points(self) -> list:
"""Generate key points from transcript (simple extraction)."""
transcript = self._get_transcript()
if not transcript or transcript.startswith("["):
return []
# Simple sentence extraction (first few sentences as key points)
sentences = transcript.split('.')[:5]
key_points = [s.strip() + '.' for s in sentences if len(s.strip()) > 20]
return key_points[:5]
def _get_tags(self) -> list:
"""Get video tags."""
try:
self._init_youtube()
if hasattr(self.youtube, 'keywords'):
return self.youtube.keywords[:10] if self.youtube.keywords else []
except Exception:
pass
return ["youtube", "video"]