Improve extraction
This commit is contained in:
@@ -11,6 +11,8 @@ Extracts:
|
||||
Note: Instagram requires browser automation. Uses Playwright.
|
||||
"""
|
||||
|
||||
import html
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
@@ -122,7 +124,138 @@ class InstagramExtractor:
|
||||
]
|
||||
return any(blocker in lowered for blocker in blockers)
|
||||
|
||||
# Try to get caption/description
|
||||
# Try to get caption/description from meta and embedded JSON first
|
||||
try:
|
||||
meta_desc = page.query_selector('meta[property="og:description"], meta[name="description"]')
|
||||
if meta_desc:
|
||||
text = (meta_desc.get_attribute("content") or "").strip()
|
||||
if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text):
|
||||
data["description"] = text
|
||||
|
||||
meta_title = page.query_selector('meta[property="og:title"], meta[name="twitter:title"]')
|
||||
if meta_title and data["title"] == "Instagram Reel":
|
||||
title_text = (meta_title.get_attribute("content") or "").strip()
|
||||
if title_text:
|
||||
data["title"] = title_text
|
||||
|
||||
if not data["description"]:
|
||||
html_source = page.content()
|
||||
patterns = [
|
||||
r'<meta[^>]+property="og:description"[^>]+content="([^"]+)"',
|
||||
r'<meta[^>]+name="description"[^>]+content="([^"]+)"',
|
||||
r'<meta[^>]+name="twitter:description"[^>]+content="([^"]+)"',
|
||||
]
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, html_source, re.IGNORECASE)
|
||||
if match:
|
||||
text = html.unescape(match.group(1)).strip()
|
||||
if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text):
|
||||
data["description"] = text
|
||||
break
|
||||
|
||||
scripts = page.query_selector_all('script[type="application/ld+json"]')
|
||||
for script in scripts:
|
||||
raw = script.inner_text().strip()
|
||||
if not raw:
|
||||
continue
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
def extract_from_obj(obj: Dict[str, Any]):
|
||||
if not isinstance(obj, dict):
|
||||
return
|
||||
desc = obj.get("description")
|
||||
if desc and not data["description"]:
|
||||
if not _looks_like_ui_prompt(desc) and not _looks_like_language_list(desc):
|
||||
data["description"] = desc.strip()
|
||||
author = obj.get("author")
|
||||
if author and data["author"] == "Unknown":
|
||||
if isinstance(author, dict):
|
||||
name = author.get("name")
|
||||
if name:
|
||||
data["author"] = name.strip()
|
||||
elif isinstance(author, list):
|
||||
for item in author:
|
||||
if isinstance(item, dict) and item.get("name"):
|
||||
data["author"] = item["name"].strip()
|
||||
break
|
||||
elif isinstance(author, str):
|
||||
data["author"] = author.strip()
|
||||
|
||||
if isinstance(payload, list):
|
||||
for obj in payload:
|
||||
extract_from_obj(obj)
|
||||
else:
|
||||
extract_from_obj(payload)
|
||||
|
||||
if data["description"] and data["author"] != "Unknown":
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not extract meta/ld+json: {e}")
|
||||
|
||||
# Try to get caption/description from embedded shared data
|
||||
try:
|
||||
html = page.content()
|
||||
payloads = []
|
||||
shared_match = re.search(r'window\._sharedData\s*=\s*({.*?});</script>', html, re.DOTALL)
|
||||
if shared_match:
|
||||
payloads.append(shared_match.group(1))
|
||||
for match in re.finditer(r'__additionalDataLoaded\([^,]+,\s*({.*?})\);', html, re.DOTALL):
|
||||
payloads.append(match.group(1))
|
||||
|
||||
def extract_from_media(media: Dict[str, Any]):
|
||||
if not isinstance(media, dict):
|
||||
return
|
||||
if data["author"] == "Unknown":
|
||||
owner = media.get("owner") or {}
|
||||
if isinstance(owner, dict):
|
||||
name = owner.get("username") or owner.get("full_name")
|
||||
if name:
|
||||
data["author"] = name.strip()
|
||||
|
||||
caption_text = None
|
||||
edge = media.get("edge_media_to_caption")
|
||||
if isinstance(edge, dict):
|
||||
edges = edge.get("edges") or []
|
||||
if edges:
|
||||
node = edges[0].get("node", {})
|
||||
if isinstance(node, dict):
|
||||
caption_text = node.get("text")
|
||||
|
||||
if not caption_text and isinstance(media.get("caption"), dict):
|
||||
caption_text = media["caption"].get("text")
|
||||
|
||||
if caption_text and not data["description"]:
|
||||
if not _looks_like_ui_prompt(caption_text) and not _looks_like_language_list(caption_text):
|
||||
data["description"] = caption_text.strip()
|
||||
|
||||
def walk(obj: Any):
|
||||
if isinstance(obj, dict):
|
||||
graphql = obj.get("graphql")
|
||||
if isinstance(graphql, dict):
|
||||
extract_from_media(graphql.get("shortcode_media") or graphql.get("media"))
|
||||
if isinstance(obj.get("shortcode_media"), dict):
|
||||
extract_from_media(obj.get("shortcode_media"))
|
||||
for v in obj.values():
|
||||
walk(v)
|
||||
elif isinstance(obj, list):
|
||||
for item in obj:
|
||||
walk(item)
|
||||
|
||||
for raw in payloads:
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
except Exception:
|
||||
continue
|
||||
walk(parsed)
|
||||
if data["description"] and data["author"] != "Unknown":
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not extract shared data: {e}")
|
||||
|
||||
# Try to get caption/description from visible text
|
||||
try:
|
||||
# Look for caption text
|
||||
captions = page.query_selector_all('h1, h2, span')
|
||||
@@ -158,44 +291,47 @@ class InstagramExtractor:
|
||||
|
||||
# Extract any visible text as content
|
||||
try:
|
||||
# Get all text content
|
||||
body_text = page.inner_text('body')
|
||||
if data["description"] and not _looks_like_ui_prompt(data["description"]):
|
||||
data["content"] = data["description"].strip()
|
||||
else:
|
||||
# Get all text content
|
||||
body_text = page.inner_text('body')
|
||||
|
||||
# Filter for meaningful content
|
||||
lines = body_text.split('\n')
|
||||
cleaned_lines = []
|
||||
buffer = []
|
||||
# Filter for meaningful content
|
||||
lines = body_text.split('\n')
|
||||
cleaned_lines = []
|
||||
buffer = []
|
||||
|
||||
def flush_buffer():
|
||||
if buffer:
|
||||
block = "\n".join(buffer)
|
||||
if not _looks_like_language_list(block):
|
||||
cleaned_lines.extend(
|
||||
[line for line in buffer if not _looks_like_ui_prompt(line)]
|
||||
)
|
||||
buffer.clear()
|
||||
def flush_buffer():
|
||||
if buffer:
|
||||
block = "\n".join(buffer)
|
||||
if not _looks_like_language_list(block):
|
||||
cleaned_lines.extend(
|
||||
[line for line in buffer if not _looks_like_ui_prompt(line)]
|
||||
)
|
||||
buffer.clear()
|
||||
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
flush_buffer()
|
||||
continue
|
||||
if _looks_like_ui_prompt(stripped):
|
||||
continue
|
||||
if len(stripped) <= 24:
|
||||
buffer.append(stripped)
|
||||
else:
|
||||
flush_buffer()
|
||||
cleaned_lines.append(stripped)
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
flush_buffer()
|
||||
continue
|
||||
if _looks_like_ui_prompt(stripped):
|
||||
continue
|
||||
if len(stripped) <= 24:
|
||||
buffer.append(stripped)
|
||||
else:
|
||||
flush_buffer()
|
||||
cleaned_lines.append(stripped)
|
||||
|
||||
flush_buffer()
|
||||
flush_buffer()
|
||||
|
||||
meaningful_lines = [
|
||||
line for line in cleaned_lines
|
||||
if len(line) > 30 and len(line) < 300
|
||||
]
|
||||
meaningful_lines = [
|
||||
line for line in cleaned_lines
|
||||
if len(line) > 30 and len(line) < 300
|
||||
]
|
||||
|
||||
data["content"] = "\n\n".join(meaningful_lines[:10])[:5000]
|
||||
data["content"] = "\n\n".join(meaningful_lines[:10])[:5000]
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not extract page text: {e}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user