From 66e1c9e0e0213835fc500e6fabacbfa3f8c9de0f Mon Sep 17 00:00:00 2001 From: Jan Bader Date: Sat, 4 Apr 2026 21:16:28 +0200 Subject: [PATCH] Improve extraction --- extractors/instagram_extractor.py | 202 +++++++++++++++++++++++++----- 1 file changed, 169 insertions(+), 33 deletions(-) diff --git a/extractors/instagram_extractor.py b/extractors/instagram_extractor.py index f017482..883559a 100644 --- a/extractors/instagram_extractor.py +++ b/extractors/instagram_extractor.py @@ -11,6 +11,8 @@ Extracts: Note: Instagram requires browser automation. Uses Playwright. """ +import html +import json import re import time from typing import Dict, Any @@ -122,7 +124,138 @@ class InstagramExtractor: ] return any(blocker in lowered for blocker in blockers) - # Try to get caption/description + # Try to get caption/description from meta and embedded JSON first + try: + meta_desc = page.query_selector('meta[property="og:description"], meta[name="description"]') + if meta_desc: + text = (meta_desc.get_attribute("content") or "").strip() + if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text): + data["description"] = text + + meta_title = page.query_selector('meta[property="og:title"], meta[name="twitter:title"]') + if meta_title and data["title"] == "Instagram Reel": + title_text = (meta_title.get_attribute("content") or "").strip() + if title_text: + data["title"] = title_text + + if not data["description"]: + html_source = page.content() + patterns = [ + r']+property="og:description"[^>]+content="([^"]+)"', + r']+name="description"[^>]+content="([^"]+)"', + r']+name="twitter:description"[^>]+content="([^"]+)"', + ] + for pattern in patterns: + match = re.search(pattern, html_source, re.IGNORECASE) + if match: + text = html.unescape(match.group(1)).strip() + if text and not _looks_like_ui_prompt(text) and not _looks_like_language_list(text): + data["description"] = text + break + + scripts = page.query_selector_all('script[type="application/ld+json"]') + for script in scripts: + raw = script.inner_text().strip() + if not raw: + continue + try: + payload = json.loads(raw) + except Exception: + continue + + def extract_from_obj(obj: Dict[str, Any]): + if not isinstance(obj, dict): + return + desc = obj.get("description") + if desc and not data["description"]: + if not _looks_like_ui_prompt(desc) and not _looks_like_language_list(desc): + data["description"] = desc.strip() + author = obj.get("author") + if author and data["author"] == "Unknown": + if isinstance(author, dict): + name = author.get("name") + if name: + data["author"] = name.strip() + elif isinstance(author, list): + for item in author: + if isinstance(item, dict) and item.get("name"): + data["author"] = item["name"].strip() + break + elif isinstance(author, str): + data["author"] = author.strip() + + if isinstance(payload, list): + for obj in payload: + extract_from_obj(obj) + else: + extract_from_obj(payload) + + if data["description"] and data["author"] != "Unknown": + break + except Exception as e: + print(f"⚠️ Could not extract meta/ld+json: {e}") + + # Try to get caption/description from embedded shared data + try: + html = page.content() + payloads = [] + shared_match = re.search(r'window\._sharedData\s*=\s*({.*?});', html, re.DOTALL) + if shared_match: + payloads.append(shared_match.group(1)) + for match in re.finditer(r'__additionalDataLoaded\([^,]+,\s*({.*?})\);', html, re.DOTALL): + payloads.append(match.group(1)) + + def extract_from_media(media: Dict[str, Any]): + if not isinstance(media, dict): + return + if data["author"] == "Unknown": + owner = media.get("owner") or {} + if isinstance(owner, dict): + name = owner.get("username") or owner.get("full_name") + if name: + data["author"] = name.strip() + + caption_text = None + edge = media.get("edge_media_to_caption") + if isinstance(edge, dict): + edges = edge.get("edges") or [] + if edges: + node = edges[0].get("node", {}) + if isinstance(node, dict): + caption_text = node.get("text") + + if not caption_text and isinstance(media.get("caption"), dict): + caption_text = media["caption"].get("text") + + if caption_text and not data["description"]: + if not _looks_like_ui_prompt(caption_text) and not _looks_like_language_list(caption_text): + data["description"] = caption_text.strip() + + def walk(obj: Any): + if isinstance(obj, dict): + graphql = obj.get("graphql") + if isinstance(graphql, dict): + extract_from_media(graphql.get("shortcode_media") or graphql.get("media")) + if isinstance(obj.get("shortcode_media"), dict): + extract_from_media(obj.get("shortcode_media")) + for v in obj.values(): + walk(v) + elif isinstance(obj, list): + for item in obj: + walk(item) + + for raw in payloads: + try: + parsed = json.loads(raw) + except Exception: + continue + walk(parsed) + if data["description"] and data["author"] != "Unknown": + break + except Exception as e: + print(f"⚠️ Could not extract shared data: {e}") + + # Try to get caption/description from visible text try: # Look for caption text captions = page.query_selector_all('h1, h2, span') @@ -158,44 +291,47 @@ class InstagramExtractor: # Extract any visible text as content try: - # Get all text content - body_text = page.inner_text('body') + if data["description"] and not _looks_like_ui_prompt(data["description"]): + data["content"] = data["description"].strip() + else: + # Get all text content + body_text = page.inner_text('body') - # Filter for meaningful content - lines = body_text.split('\n') - cleaned_lines = [] - buffer = [] + # Filter for meaningful content + lines = body_text.split('\n') + cleaned_lines = [] + buffer = [] - def flush_buffer(): - if buffer: - block = "\n".join(buffer) - if not _looks_like_language_list(block): - cleaned_lines.extend( - [line for line in buffer if not _looks_like_ui_prompt(line)] - ) - buffer.clear() + def flush_buffer(): + if buffer: + block = "\n".join(buffer) + if not _looks_like_language_list(block): + cleaned_lines.extend( + [line for line in buffer if not _looks_like_ui_prompt(line)] + ) + buffer.clear() - for line in lines: - stripped = line.strip() - if not stripped: - flush_buffer() - continue - if _looks_like_ui_prompt(stripped): - continue - if len(stripped) <= 24: - buffer.append(stripped) - else: - flush_buffer() - cleaned_lines.append(stripped) + for line in lines: + stripped = line.strip() + if not stripped: + flush_buffer() + continue + if _looks_like_ui_prompt(stripped): + continue + if len(stripped) <= 24: + buffer.append(stripped) + else: + flush_buffer() + cleaned_lines.append(stripped) - flush_buffer() + flush_buffer() - meaningful_lines = [ - line for line in cleaned_lines - if len(line) > 30 and len(line) < 300 - ] + meaningful_lines = [ + line for line in cleaned_lines + if len(line) > 30 and len(line) < 300 + ] - data["content"] = "\n\n".join(meaningful_lines[:10])[:5000] + data["content"] = "\n\n".join(meaningful_lines[:10])[:5000] except Exception as e: print(f"⚠️ Could not extract page text: {e}")