| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648 |
- """
- 从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
- 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
- 主函数:extract_sources_to_json(raw_cases_dir)
- - 扫描该目录下所有 case_{platform}.json
- - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
- - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
- - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
- - 同时下载图片到 {raw_cases_dir}/images/{case_id}/
- """
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple
- import asyncio
- import aiohttp
- from urllib.parse import urlparse, parse_qs, urlencode
- # ── URL → (platform, content_id) 解析 ────────────────────────────────
- _URL_PATTERNS = [
- # B站: https://www.bilibili.com/video/BV1xxx
- ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
- # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
- ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
- # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
- ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
- # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
- ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
- # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
- ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
- ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
- # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
- ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
- ]
- def parse_url(url: str) -> Optional[Tuple[str, str]]:
- """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
- if not url or not isinstance(url, str):
- return None
- for platform, pat in _URL_PATTERNS:
- m = pat.search(url)
- if m:
- return platform, m.group(1)
- return None
- # ── 从 case 文件中抽取所有条目(URL + evaluation) ────────────────────────────────
- def extract_entries_from_case(case_data: Any) -> List[Dict[str, Any]]:
- """
- 从 case 数据中抽取条目,每条包含 url 和 agent 的 evaluation。
- 返回: [{"url": str, "evaluation": dict | None, "title": str | None, "case_id": str | None}]
- """
- entries: List[Dict[str, Any]] = []
- if not isinstance(case_data, dict):
- return entries
- # 新格式:工序发现[].帖子链接
- for item in case_data.get("工序发现", []) or []:
- if isinstance(item, dict):
- link = item.get("帖子链接") or item.get("source_url")
- if link:
- entries.append({
- "url": link,
- "evaluation": item.get("evaluation"),
- "title": item.get("title"),
- "case_id": item.get("case_id"),
- })
- # 主格式:cases[]
- for item in case_data.get("cases", []) or []:
- if isinstance(item, dict):
- link = item.get("source_url") or item.get("帖子链接")
- if link:
- entries.append({
- "url": link,
- "evaluation": item.get("evaluation"),
- "title": item.get("title"),
- "case_id": item.get("case_id"),
- })
- return entries
- def extract_urls_from_case(case_data: Any) -> List[str]:
- """[Legacy] 旧接口,保留供可能的外部调用。内部改用 extract_entries_from_case。"""
- return [e["url"] for e in extract_entries_from_case(case_data)]
- # ── 过滤规则(统一入口) ────────────────────────────────
- # 默认阈值:body_text 最少字符数、agent 评分下限
- DEFAULT_MIN_BODY_LEN = 30
- DEFAULT_MIN_SCORE = 70.0
- DEFAULT_CUTOFF_DATE = (2025, 10, 1)
- def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool:
- """判断帖子是否早于截止时间戳(秒级)
- 如果 timestamp 为 0 或不存在,返回 False(保留)
- """
- post = source.get("post", {})
- if not isinstance(post, dict):
- return False
- ts = post.get("publish_timestamp")
- # 没有 timestamp 或为 0,保留
- if not ts or ts == 0:
- return False
- try:
- ts = int(ts)
- # 毫秒级转秒级
- if ts > 1000000000000:
- ts = ts / 1000
- return ts < cutoff_ts
- except (ValueError, TypeError):
- pass
- # 尝试解析字符串格式 "2025-05-02 19:25:30"
- try:
- from datetime import datetime
- dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S")
- return dt.timestamp() < cutoff_ts
- except Exception:
- # 解析失败,保留
- return False
- def _check_filters(
- source: Dict[str, Any],
- cutoff_ts: int,
- min_body_len: int,
- min_score: float,
- ) -> Optional[str]:
- """
- 对一条 source 逐条检查过滤规则。
- 返回:
- None —— 条目合格
- 非 None 字符串 —— 不合格的原因(写入 filter_reason)
- """
- post = source.get("post", {}) or {}
- # 1. body_text 完整性
- body = post.get("body_text") or post.get("desc") or ""
- if not isinstance(body, str) or len(body.strip()) < min_body_len:
- return f"missing_body_text:len={len(body.strip()) if isinstance(body, str) else 0}"
- # 2. agent 评分
- evaluation = source.get("evaluation")
- if not isinstance(evaluation, dict):
- return "missing_evaluation"
- quality = evaluation.get("quality")
- if not isinstance(quality, dict):
- return "missing_evaluation"
- score = quality.get("overall_score")
- if not isinstance(score, (int, float)):
- return "invalid_score"
- if score < min_score:
- return f"low_score:{score}"
- # 3. 过时(复用原 _is_before_cutoff 对时间戳的解析)
- if _is_before_cutoff(source, cutoff_ts):
- return "outdated"
- return None
- # ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
- def _normalize_url(url: str) -> Optional[str]:
- """规范化 URL:排序 query 参数,去掉尾斜杠"""
- try:
- parsed = urlparse(url)
- if parsed.query:
- # 排序 query 参数
- params = parse_qs(parsed.query, keep_blank_values=True)
- sorted_query = urlencode(sorted(params.items()), doseq=True)
- normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}"
- else:
- normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
- return normalized.rstrip('/')
- except:
- return None
- def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
- """
- 构建 (platform, channel_content_id) -> post 映射。
- Args:
- cache_dir: cache 目录路径
- trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
- 否则扫描所有 cache 文件
- Returns:
- (platform, content_id) -> post 的映射字典
- """
- index: Dict[Tuple[str, str], Dict[str, Any]] = {}
- if not cache_dir.exists():
- return index
- # 如果提供了 trace_ids,只加载这些特定文件
- if trace_ids:
- cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
- cache_files = [f for f in cache_files if f.exists()]
- else:
- # 否则扫描所有 cache 文件
- cache_files = list(cache_dir.glob("*.json"))
- for cache_file in cache_files:
- try:
- with open(cache_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- except Exception:
- continue
- for key, entry in data.items():
- if not key.startswith("search:"):
- continue
- platform = key.split(":", 1)[1]
- # 新格式:entry = {"history": [...], "latest_index": n}
- # 旧格式:entry = {"keyword": ..., "posts": [...]}
- if isinstance(entry, dict) and "history" in entry:
- post_lists = [h.get("posts", []) for h in entry.get("history", [])]
- elif isinstance(entry, dict) and "posts" in entry:
- post_lists = [entry.get("posts", [])]
- else:
- continue
- for posts in post_lists:
- for post in posts or []:
- if not isinstance(post, dict):
- continue
- cid = post.get("channel_content_id")
- # YouTube 平台用 video_id 而非 channel_content_id
- if not cid and post.get("video_id"):
- cid = post.get("video_id")
- post["channel_content_id"] = cid # 补全字段
- if cid:
- # 用 (platform, content_id) 作为索引键
- index[(platform, str(cid))] = post
- # 额外用 link / url 字段建立索引(用于 GZH 等平台)
- link = post.get("link") or post.get("url") or ""
- if link:
- index[("url", link)] = post
- # 规范化 URL 索引(排序 query 参数,去掉尾斜杠)
- norm = _normalize_url(link)
- if norm:
- index[("norm_url", norm)] = post
- return index
- # ── 主入口 ────────────────────────────────
- def extract_sources_to_json(
- raw_cases_dir: Path,
- cache_dir: Optional[Path] = None,
- output_name: str = "source.json",
- trace_ids: Optional[List[str]] = None,
- min_body_len: int = DEFAULT_MIN_BODY_LEN,
- min_score: float = DEFAULT_MIN_SCORE,
- cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE,
- ) -> Dict[str, Any]:
- """
- 扫描 raw_cases_dir 下的 case_*.json,
- 从 cache 中找出原始帖子,并对结果执行统一过滤(body_text / 评分 / 时效)。
- 过滤规则(均可通过参数调整):
- - body_text 为空或少于 min_body_len 字符 → filter_reason=missing_body_text
- - agent evaluation 缺失或 weighted_score < min_score → filter_reason=missing_evaluation / invalid_score / low_score
- - publish_timestamp 早于 cutoff_date → filter_reason=outdated
- 参数:
- min_body_len: body_text 最少字符数,默认 30
- min_score: agent 评分下限,默认 70
- cutoff_date: 过时截止日期 (year, month, day),默认 (2025, 10, 1)
- 返回统计信息 dict。
- """
- raw_cases_dir = Path(raw_cases_dir)
- if cache_dir is None:
- # 项目根目录:script 文件往上三级
- project_root = Path(__file__).resolve().parent.parent.parent.parent
- cache_dir = project_root / ".cache" / "content_search"
- cache_dir = Path(cache_dir)
- # 1. 构建 cache 索引
- cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
- # 2. 加载已有的 source.json(如果存在)
- output_file = raw_cases_dir / output_name
- existing_sources = []
- existing_ids = set() # (platform, channel_content_id) 集合用于去重
- if output_file.exists():
- try:
- with open(output_file, "r", encoding="utf-8") as f:
- existing_data = json.load(f)
- existing_sources = existing_data.get("sources", [])
- # 构建已有的 ID 集合
- for src in existing_sources:
- key = (src.get("platform"), src.get("channel_content_id"))
- existing_ids.add(key)
- except Exception as e:
- print(f"Warning: Failed to load existing source.json: {e}")
- # 2.1 加载已有的 filtered_cases.json(如果存在)
- filtered_output_file = raw_cases_dir / "filtered_cases.json"
- existing_filtered_sources = []
- existing_filtered_ids = set()
- if filtered_output_file.exists():
- try:
- with open(filtered_output_file, "r", encoding="utf-8") as f:
- filtered_data = json.load(f)
- existing_filtered_sources = filtered_data.get("sources", [])
- for src in existing_filtered_sources:
- key = (src.get("platform"), src.get("channel_content_id"))
- existing_filtered_ids.add(key)
- except Exception as e:
- print(f"Warning: Failed to load existing filtered_cases.json: {e}")
- # 3. 扫描所有 case 文件
- matched: List[Dict[str, Any]] = []
- unmatched: List[Dict[str, Any]] = []
- seen_keys: set = set(existing_ids) # 从已有的 ID 开始
- for case_file in sorted(raw_cases_dir.glob("case_*.json")):
- # 跳过自己(如果 source.json 误被命名成 case_*)
- if case_file.name == output_name:
- continue
- try:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- except Exception as e:
- # 尝试自动修复 JSON 格式错误
- try:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- with open(case_file, "r", encoding="utf-8") as f:
- raw_content = f.read()
- success, case_data, fix_desc = try_fix_and_parse(raw_content)
- if success:
- # 修复成功,写回文件
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}")
- else:
- unmatched.append({"case_file": case_file.name, "error": str(e)})
- continue
- except Exception:
- unmatched.append({"case_file": case_file.name, "error": str(e)})
- continue
- entries = extract_entries_from_case(case_data)
- for entry in entries:
- url = entry["url"]
- evaluation = entry.get("evaluation")
- # 解析 URL 得到 platform 和 content_id
- parsed = parse_url(url)
- if not parsed:
- unmatched.append({
- "case_file": case_file.name,
- "url": url,
- "reason": "url_parse_failed",
- })
- continue
- platform, cid = parsed
- key = (platform, cid)
- if key in seen_keys:
- continue
- seen_keys.add(key)
- # 多级匹配:
- # 1. (platform, content_id) 精确匹配
- # 2. 完整 URL 匹配
- # 3. 规范化 URL 匹配
- post = cache_index.get(key)
- if not post:
- # 2. 完整 URL
- post = cache_index.get(("url", url))
- if not post:
- # 3. 规范化 URL
- norm = _normalize_url(url)
- if norm:
- post = cache_index.get(("norm_url", norm))
- if post:
- # 统一用 cache 中的 channel_content_id 生成 case_id
- # 这样保证 case_id 和 cache 中的 ID 一致
- actual_cid = post.get("channel_content_id") or post.get("video_id") or cid
- actual_case_id = f"{platform}_{actual_cid}"
- matched.append({
- "case_id": actual_case_id,
- "case_file": case_file.name,
- "platform": platform,
- "channel_content_id": str(actual_cid),
- "source_url": url,
- "evaluation": evaluation,
- "post": post,
- })
- else:
- unmatched.append({
- "case_id": f"{platform}_{cid}", # 统一格式的 ID
- "case_file": case_file.name,
- "platform": platform,
- "channel_content_id": cid,
- "source_url": url,
- "reason": "not_in_cache",
- })
- # 4. 合并已有数据和新匹配的数据
- all_sources = existing_sources + matched
- # 5. 统一过滤:body_text 完整性 / agent 评分 / 时效
- from datetime import datetime as _dt
- cutoff_ts = int(_dt(*cutoff_date).timestamp())
- kept_sources: List[Dict[str, Any]] = []
- filtered_sources: List[Dict[str, Any]] = []
- reason_counts: Dict[str, int] = {}
- for s in all_sources:
- reason = _check_filters(s, cutoff_ts, min_body_len, min_score)
- if reason is None:
- kept_sources.append(s)
- else:
- # 记录过滤原因
- s_copy = dict(s)
- s_copy["filter_reason"] = reason
- filtered_sources.append(s_copy)
- # 统计原因类型(只取冒号前的类别)
- category = reason.split(":", 1)[0]
- reason_counts[category] = reason_counts.get(category, 0) + 1
- all_sources = kept_sources
- filtered_count = len(filtered_sources)
- # 6. 转换 timestamp 为可读格式
- _convert_timestamps(all_sources)
- _convert_timestamps(filtered_sources)
- # 7. 写 source.json
- output = {
- "total": len(all_sources),
- "cache_dir": str(cache_dir),
- "sources": all_sources,
- }
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output, f, ensure_ascii=False, indent=2)
- # 8. 写 filtered_cases.json(被过滤掉的帖子,按原因分组)
- if filtered_sources:
- for fs in filtered_sources:
- key = (fs.get("platform"), fs.get("channel_content_id"))
- if key not in existing_filtered_ids:
- existing_filtered_sources.append(fs)
- existing_filtered_ids.add(key)
- # 按原因类别分组
- by_reason: Dict[str, List[Dict[str, Any]]] = {}
- for fs in existing_filtered_sources:
- reason = fs.get("filter_reason", "unknown")
- category = reason.split(":", 1)[0]
- by_reason.setdefault(category, []).append(fs)
- filtered_output = {
- "total": len(existing_filtered_sources),
- "by_reason": {
- category: {
- "count": len(items),
- "sources": items,
- }
- for category, items in by_reason.items()
- },
- }
- with open(filtered_output_file, "w", encoding="utf-8") as f:
- json.dump(filtered_output, f, ensure_ascii=False, indent=2)
- # 9. 下载图片到 raw_cases/images/{case_id}/
- images_downloaded = download_images_for_sources(matched, raw_cases_dir)
- # 10. 构建被过滤条目的摘要(供续跑 feedback 使用)
- filtered_details: List[Dict[str, Any]] = []
- for fs in filtered_sources:
- post = fs.get("post", {}) or {}
- title = post.get("title") or fs.get("source_url", "")
- filtered_details.append({
- "case_id": fs.get("case_id", ""),
- "platform": fs.get("platform", ""),
- "title": title[:60] if title else "",
- "filter_reason": fs.get("filter_reason", ""),
- })
- # 返回统计信息
- return {
- "total_matched": len(matched),
- "total_existing": len(existing_sources),
- "total_unmatched": len(unmatched),
- "filtered_total": filtered_count,
- "filtered_reasons": reason_counts,
- "filtered_details": filtered_details,
- "images_downloaded": images_downloaded,
- "output_file": str(output_file),
- }
- # ── 图片下载 ────────────────────────────────
- def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]:
- """从 post 中提取所有图片 URL"""
- urls = []
- images = post.get("images", [])
- if isinstance(images, list):
- for img in images:
- if isinstance(img, str) and img.startswith("http"):
- urls.append(img)
- elif isinstance(img, dict) and "url" in img:
- urls.append(img["url"])
- image_url_list = post.get("image_url_list", [])
- if isinstance(image_url_list, list):
- for img_obj in image_url_list:
- if isinstance(img_obj, dict) and "image_url" in img_obj:
- urls.append(img_obj["image_url"])
- return urls
- def _format_timestamp(ts: Any) -> Optional[str]:
- """将时间戳(秒/毫秒)转换为可读格式"""
- from datetime import datetime
- if ts is None or ts == 0 or ts == "":
- return None
- try:
- ts = int(ts)
- # 毫秒级时间戳
- if ts > 1000000000000:
- ts = ts / 1000
- dt = datetime.fromtimestamp(ts)
- return dt.strftime("%Y-%m-%d %H:%M:%S")
- except Exception:
- return None
- def _convert_timestamps(sources: List[Dict[str, Any]]) -> None:
- """将 source 列表中 post 的时间戳字段替换为可读格式"""
- timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"]
- for src in sources:
- post = src.get("post", {})
- if not isinstance(post, dict):
- continue
- for field in timestamp_fields:
- if field in post:
- readable = _format_timestamp(post.get(field))
- if readable:
- post[field] = readable
- def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int:
- """
- 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/
- Returns:
- 下载成功的图片总数
- """
- import urllib.request
- import urllib.error
- images_base = raw_cases_dir / "images"
- total_downloaded = 0
- # 设置 headers 避免被拒(X/Twitter 需要 User-Agent)
- opener = urllib.request.build_opener()
- opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")]
- urllib.request.install_opener(opener)
- for src in sources:
- case_id = src.get("case_id", "")
- post = src.get("post", {})
- image_urls = _get_image_urls_from_post(post)
- if not image_urls:
- continue
- case_dir = images_base / case_id
- case_dir.mkdir(parents=True, exist_ok=True)
- for idx, url in enumerate(image_urls):
- # 确定文件扩展名
- ext = ".jpg"
- if ".png" in url.lower():
- ext = ".png"
- elif ".webp" in url.lower():
- ext = ".webp"
- save_path = case_dir / f"{idx:02d}{ext}"
- if save_path.exists():
- total_downloaded += 1
- continue
- try:
- urllib.request.urlretrieve(url, str(save_path))
- total_downloaded += 1
- except Exception:
- # 下载失败就跳过,不中断流程
- pass
- return total_downloaded
- if __name__ == "__main__":
- # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
- import sys
- if len(sys.argv) < 2:
- print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
- sys.exit(1)
- raw_cases_dir = Path(sys.argv[1])
- cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
- result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
- print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
- print(f" Output: {raw_cases_dir / 'source.json'}")
|