diff --git a/extractors/instagram_extractor.py b/extractors/instagram_extractor.py
index f017482..883559a 100644
--- a/extractors/instagram_extractor.py
+++ b/extractors/instagram_extractor.py
@@ -11,6 +11,8 @@ Extracts:
Note: Instagram requires browser automation. Uses Playwright.
"""
+import html
+import json
import re
import time
from typing import Dict, Any
@@ -122,7 +124,138 @@ class InstagramExtractor:
]
return any(blocker in lowered for blocker in blockers)
- # Try to get caption/description
+ # Try to get caption/description from meta and embedded JSON first
+ try:
+ meta_desc = page.query_selector('meta[property="og:description"], meta[name="description"]')
+ if meta_desc:
+ text = (meta_desc.get_attribute("content") or "").strip()
+ if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text):
+ data["description"] = text
+
+ meta_title = page.query_selector('meta[property="og:title"], meta[name="twitter:title"]')
+ if meta_title and data["title"] == "Instagram Reel":
+ title_text = (meta_title.get_attribute("content") or "").strip()
+ if title_text:
+ data["title"] = title_text
+
+ if not data["description"]:
+ html_source = page.content()
+ patterns = [
+ r']+property="og:description"[^>]+content="([^"]+)"',
+ r']+name="description"[^>]+content="([^"]+)"',
+ r']+name="twitter:description"[^>]+content="([^"]+)"',
+ ]
+ for pattern in patterns:
+ match = re.search(pattern, html_source, re.IGNORECASE)
+ if match:
+ text = html.unescape(match.group(1)).strip()
+ if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text):
+ data["description"] = text
+ break
+
+ scripts = page.query_selector_all('script[type="application/ld+json"]')
+ for script in scripts:
+ raw = script.inner_text().strip()
+ if not raw:
+ continue
+ try:
+ payload = json.loads(raw)
+ except Exception:
+ continue
+
+ def extract_from_obj(obj: Dict[str, Any]):
+ if not isinstance(obj, dict):
+ return
+ desc = obj.get("description")
+ if desc and not data["description"]:
+ if not _looks_like_ui_prompt(desc) and not _looks_like_language_list(desc):
+ data["description"] = desc.strip()
+ author = obj.get("author")
+ if author and data["author"] == "Unknown":
+ if isinstance(author, dict):
+ name = author.get("name")
+ if name:
+ data["author"] = name.strip()
+ elif isinstance(author, list):
+ for item in author:
+ if isinstance(item, dict) and item.get("name"):
+ data["author"] = item["name"].strip()
+ break
+ elif isinstance(author, str):
+ data["author"] = author.strip()
+
+ if isinstance(payload, list):
+ for obj in payload:
+ extract_from_obj(obj)
+ else:
+ extract_from_obj(payload)
+
+ if data["description"] and data["author"] != "Unknown":
+ break
+ except Exception as e:
+ print(f"⚠️ Could not extract meta/ld+json: {e}")
+
+ # Try to get caption/description from embedded shared data
+ try:
+ html = page.content()
+ payloads = []
+ shared_match = re.search(r'window\._sharedData\s*=\s*({.*?});', html, re.DOTALL)
+ if shared_match:
+ payloads.append(shared_match.group(1))
+ for match in re.finditer(r'__additionalDataLoaded\([^,]+,\s*({.*?})\);', html, re.DOTALL):
+ payloads.append(match.group(1))
+
+ def extract_from_media(media: Dict[str, Any]):
+ if not isinstance(media, dict):
+ return
+ if data["author"] == "Unknown":
+ owner = media.get("owner") or {}
+ if isinstance(owner, dict):
+ name = owner.get("username") or owner.get("full_name")
+ if name:
+ data["author"] = name.strip()
+
+ caption_text = None
+ edge = media.get("edge_media_to_caption")
+ if isinstance(edge, dict):
+ edges = edge.get("edges") or []
+ if edges:
+ node = edges[0].get("node", {})
+ if isinstance(node, dict):
+ caption_text = node.get("text")
+
+ if not caption_text and isinstance(media.get("caption"), dict):
+ caption_text = media["caption"].get("text")
+
+ if caption_text and not data["description"]:
+ if not _looks_like_ui_prompt(caption_text) and not _looks_like_language_list(caption_text):
+ data["description"] = caption_text.strip()
+
+ def walk(obj: Any):
+ if isinstance(obj, dict):
+ graphql = obj.get("graphql")
+ if isinstance(graphql, dict):
+ extract_from_media(graphql.get("shortcode_media") or graphql.get("media"))
+ if isinstance(obj.get("shortcode_media"), dict):
+ extract_from_media(obj.get("shortcode_media"))
+ for v in obj.values():
+ walk(v)
+ elif isinstance(obj, list):
+ for item in obj:
+ walk(item)
+
+ for raw in payloads:
+ try:
+ parsed = json.loads(raw)
+ except Exception:
+ continue
+ walk(parsed)
+ if data["description"] and data["author"] != "Unknown":
+ break
+ except Exception as e:
+ print(f"⚠️ Could not extract shared data: {e}")
+
+ # Try to get caption/description from visible text
try:
# Look for caption text
captions = page.query_selector_all('h1, h2, span')
@@ -158,44 +291,47 @@ class InstagramExtractor:
# Extract any visible text as content
try:
- # Get all text content
- body_text = page.inner_text('body')
+ if data["description"] and not _looks_like_ui_prompt(data["description"]):
+ data["content"] = data["description"].strip()
+ else:
+ # Get all text content
+ body_text = page.inner_text('body')
- # Filter for meaningful content
- lines = body_text.split('\n')
- cleaned_lines = []
- buffer = []
+ # Filter for meaningful content
+ lines = body_text.split('\n')
+ cleaned_lines = []
+ buffer = []
- def flush_buffer():
- if buffer:
- block = "\n".join(buffer)
- if not _looks_like_language_list(block):
- cleaned_lines.extend(
- [line for line in buffer if not _looks_like_ui_prompt(line)]
- )
- buffer.clear()
+ def flush_buffer():
+ if buffer:
+ block = "\n".join(buffer)
+ if not _looks_like_language_list(block):
+ cleaned_lines.extend(
+ [line for line in buffer if not _looks_like_ui_prompt(line)]
+ )
+ buffer.clear()
- for line in lines:
- stripped = line.strip()
- if not stripped:
- flush_buffer()
- continue
- if _looks_like_ui_prompt(stripped):
- continue
- if len(stripped) <= 24:
- buffer.append(stripped)
- else:
- flush_buffer()
- cleaned_lines.append(stripped)
+ for line in lines:
+ stripped = line.strip()
+ if not stripped:
+ flush_buffer()
+ continue
+ if _looks_like_ui_prompt(stripped):
+ continue
+ if len(stripped) <= 24:
+ buffer.append(stripped)
+ else:
+ flush_buffer()
+ cleaned_lines.append(stripped)
- flush_buffer()
+ flush_buffer()
- meaningful_lines = [
- line for line in cleaned_lines
- if len(line) > 30 and len(line) < 300
- ]
+ meaningful_lines = [
+ line for line in cleaned_lines
+ if len(line) > 30 and len(line) < 300
+ ]
- data["content"] = "\n\n".join(meaningful_lines[:10])[:5000]
+ data["content"] = "\n\n".join(meaningful_lines[:10])[:5000]
except Exception as e:
print(f"⚠️ Could not extract page text: {e}")