demand_export.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. """需求元素/短语导出到 MySQL 的逻辑(全量,不做 ODPS 过滤)。
  2. MySQL hot_content_demand_exports 写入本模块产出的全量行:
  3. - 元素:点列表关联的全部高贡献词(含未匹配需求)
  4. - 短语:灵感点/目的点/关键点全部短语(含未匹配需求)
  5. ODPS 写入前的过滤(微信指数门槛、matched_demand 等)在 demand_hive_export 中处理。
  6. """
  7. from __future__ import annotations
  8. import argparse
  9. import json
  10. from typing import Any
  11. from app.hot_content.config import load_flow_config
  12. from app.hot_content.repository import HotContentRepository
  13. WXINDEX_EXPORT_THRESHOLD = 1_000_000.0 # 与 WXINDEX_SCORE_THRESHOLD 默认值一致
  14. POINT_CATEGORIES = ("灵感点", "目的点", "关键点")
  15. ITEM_TYPE_ELEMENT = "元素"
  16. ITEM_TYPE_PHRASE = "短语"
  17. def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None:
  18. wxindex = trend_json.get("wxindex")
  19. if not isinstance(wxindex, dict):
  20. return None
  21. try:
  22. return float(wxindex.get("latest_total_score"))
  23. except (TypeError, ValueError):
  24. return None
  25. def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
  26. if not isinstance(trend_json, dict):
  27. return ""
  28. wxindex = trend_json.get("wxindex")
  29. if isinstance(wxindex, dict):
  30. keyword = str(wxindex.get("keyword") or "").strip()
  31. if keyword:
  32. return keyword
  33. return str(trend_json.get("llm_selected_word") or "").strip()
  34. def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
  35. wxindex = trend_json.get("wxindex")
  36. if not isinstance(wxindex, dict):
  37. return ""
  38. return str(wxindex.get("trend") or "").strip()
  39. def _to_contribution_score(value: Any) -> float | None:
  40. try:
  41. if value is None:
  42. return None
  43. return float(value)
  44. except (TypeError, ValueError):
  45. return None
  46. def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]:
  47. match_rows = word_row.get("匹配需求列表") or []
  48. if not isinstance(match_rows, list):
  49. return []
  50. names: list[str] = []
  51. seen: set[str] = set()
  52. for match in match_rows:
  53. if not isinstance(match, dict):
  54. continue
  55. demand_name = str(match.get("demand_name") or "").strip()
  56. if not demand_name or demand_name in seen:
  57. continue
  58. seen.add(demand_name)
  59. names.append(demand_name)
  60. return names
  61. def extract_matched_demand_names(word_row: dict[str, Any]) -> str:
  62. return " ".join(extract_matched_demand_name_list(word_row))
  63. def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
  64. lookup: dict[str, dict[str, Any]] = {}
  65. for word_row in words_rows:
  66. if not isinstance(word_row, dict):
  67. continue
  68. word_text = str(word_row.get("词") or "").strip()
  69. if word_text:
  70. lookup[word_text] = word_row
  71. return lookup
  72. def _has_matched_demand_text(value: Any) -> bool:
  73. return bool(str(value or "").strip())
  74. def build_merged_word_lookup(
  75. match_result: dict[str, Any],
  76. contribution_points: dict[str, Any] | None,
  77. ) -> dict[str, dict[str, Any]]:
  78. """以 contribution 高贡献词为全集,匹配信息仅以 match_result 为准。"""
  79. match_lookup = build_word_lookup(match_result.get("高贡献词列表") or [])
  80. contribution_source = (
  81. contribution_points if isinstance(contribution_points, dict) else match_result
  82. )
  83. words_rows = contribution_source.get("高贡献词列表") or []
  84. if not isinstance(words_rows, list):
  85. words_rows = []
  86. lookup: dict[str, dict[str, Any]] = {}
  87. for word_row in words_rows:
  88. if not isinstance(word_row, dict):
  89. continue
  90. word_text = str(word_row.get("词") or "").strip()
  91. if not word_text:
  92. continue
  93. match_row = match_lookup.get(word_text)
  94. merged = {k: v for k, v in word_row.items() if k != "匹配需求列表"}
  95. if isinstance(match_row, dict):
  96. merged["匹配需求列表"] = list(match_row.get("匹配需求列表") or [])
  97. else:
  98. merged["匹配需求列表"] = []
  99. lookup[word_text] = merged
  100. for word_text, match_row in match_lookup.items():
  101. if word_text not in lookup:
  102. lookup[word_text] = match_row
  103. return lookup
  104. def enrich_word_lookup_from_points(
  105. word_lookup: dict[str, dict[str, Any]],
  106. *,
  107. points: list[Any],
  108. match_result: dict[str, Any],
  109. ) -> None:
  110. """把点列表里出现、但高贡献词列表未收录的词补进 lookup。"""
  111. if not isinstance(points, list):
  112. return
  113. for point_item in points:
  114. if not isinstance(point_item, dict):
  115. continue
  116. match_words = point_item.get("匹配词列表") or []
  117. if not isinstance(match_words, list):
  118. continue
  119. for hit in match_words:
  120. if not isinstance(hit, dict):
  121. continue
  122. word_text = str(hit.get("词") or "").strip()
  123. if not word_text or word_text in word_lookup:
  124. continue
  125. word_lookup[word_text] = _resolve_word_row(
  126. word_text,
  127. word_lookup=word_lookup,
  128. match_result=match_result,
  129. )
  130. def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
  131. word_categories: dict[str, set[str]] = {}
  132. if not isinstance(points, list):
  133. return word_categories
  134. for point_item in points:
  135. if not isinstance(point_item, dict):
  136. continue
  137. category = str(point_item.get("来源") or "").strip()
  138. if category not in POINT_CATEGORIES:
  139. continue
  140. match_words = point_item.get("匹配词列表") or []
  141. if not isinstance(match_words, list):
  142. continue
  143. for hit in match_words:
  144. if not isinstance(hit, dict):
  145. continue
  146. word_text = str(hit.get("词") or "").strip()
  147. if not word_text:
  148. continue
  149. word_categories.setdefault(word_text, set()).add(category)
  150. return word_categories
  151. def ordered_point_categories(categories: set[str]) -> list[str]:
  152. return [category for category in POINT_CATEGORIES if category in categories]
  153. def extract_point_matched_demand_names(
  154. point_item: dict[str, Any],
  155. word_lookup: dict[str, dict[str, Any]],
  156. ) -> str:
  157. match_words = point_item.get("匹配词列表") or []
  158. if not isinstance(match_words, list):
  159. return ""
  160. names: list[str] = []
  161. seen: set[str] = set()
  162. for hit in match_words:
  163. if not isinstance(hit, dict):
  164. continue
  165. word_text = str(hit.get("词") or "").strip()
  166. word_row = word_lookup.get(word_text)
  167. if not word_row:
  168. continue
  169. for demand_name in extract_matched_demand_name_list(word_row):
  170. if demand_name in seen:
  171. continue
  172. seen.add(demand_name)
  173. names.append(demand_name)
  174. return " ".join(names)
  175. def _build_word_export_row(
  176. word_text: str,
  177. word_row: dict[str, Any],
  178. category: str,
  179. ) -> dict[str, Any]:
  180. matched_demand = extract_matched_demand_names(word_row)
  181. return {
  182. "item_type": ITEM_TYPE_ELEMENT,
  183. "item_text": word_text,
  184. "point_category": category,
  185. "matched_demand": matched_demand,
  186. "contribution_score": (
  187. _to_contribution_score(word_row.get("贡献度"))
  188. if _has_matched_demand_text(matched_demand)
  189. else None
  190. ),
  191. }
  192. def _resolve_word_row(
  193. word_text: str,
  194. *,
  195. word_lookup: dict[str, dict[str, Any]],
  196. match_result: dict[str, Any],
  197. ) -> dict[str, Any]:
  198. word_row = word_lookup.get(word_text)
  199. if isinstance(word_row, dict):
  200. return word_row
  201. for row in match_result.get("匹配到需求的词列表") or []:
  202. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  203. return row
  204. for row in match_result.get("高贡献词列表") or []:
  205. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  206. return row
  207. return {"词": word_text}
  208. def append_wxindex_keyword_rows(
  209. export_rows: list[dict[str, Any]],
  210. *,
  211. trend_json: dict[str, Any] | None,
  212. match_result: dict[str, Any],
  213. word_lookup: dict[str, dict[str, Any]],
  214. word_to_categories: dict[str, set[str]],
  215. ) -> None:
  216. keyword = get_wxindex_keyword(trend_json)
  217. if not keyword:
  218. return
  219. has_keyword_row = any(
  220. row.get("item_type") == ITEM_TYPE_ELEMENT and str(row.get("item_text") or "").strip() == keyword
  221. for row in export_rows
  222. )
  223. if has_keyword_row:
  224. return
  225. word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
  226. categories = ordered_point_categories(word_to_categories.get(keyword, set()))
  227. if categories:
  228. for category in categories:
  229. export_rows.append(_build_word_export_row(keyword, word_row, category))
  230. return
  231. export_rows.append(_build_word_export_row(keyword, word_row, ""))
  232. def build_demand_export_rows(
  233. match_result: dict[str, Any],
  234. *,
  235. contribution_points: dict[str, Any] | None = None,
  236. trend_json: dict[str, Any] | None = None,
  237. ) -> list[dict[str, Any]]:
  238. export_rows: list[dict[str, Any]] = []
  239. contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
  240. points = contribution_source.get("点列表") or []
  241. if not isinstance(points, list):
  242. points = []
  243. word_lookup = build_merged_word_lookup(match_result, contribution_points)
  244. enrich_word_lookup_from_points(
  245. word_lookup,
  246. points=points,
  247. match_result=match_result,
  248. )
  249. word_to_categories = build_word_to_categories(points)
  250. for word_text in sorted(set(word_lookup) | set(word_to_categories)):
  251. categories = ordered_point_categories(word_to_categories.get(word_text, set()))
  252. if not categories:
  253. continue
  254. word_row = word_lookup.get(word_text) or _resolve_word_row(
  255. word_text,
  256. word_lookup=word_lookup,
  257. match_result=match_result,
  258. )
  259. for category in categories:
  260. export_rows.append(_build_word_export_row(word_text, word_row, category))
  261. for point_item in points:
  262. if not isinstance(point_item, dict):
  263. continue
  264. point_text = str(point_item.get("点") or "").strip()
  265. category = str(point_item.get("来源") or "").strip()
  266. if not point_text or category not in POINT_CATEGORIES:
  267. continue
  268. export_rows.append(
  269. {
  270. "item_type": ITEM_TYPE_PHRASE,
  271. "item_text": point_text,
  272. "point_category": category,
  273. "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
  274. "contribution_score": None,
  275. }
  276. )
  277. append_wxindex_keyword_rows(
  278. export_rows,
  279. trend_json=trend_json,
  280. match_result=match_result,
  281. word_lookup=word_lookup,
  282. word_to_categories=word_to_categories,
  283. )
  284. deduped_rows: list[dict[str, Any]] = []
  285. seen: set[tuple[str, str, str]] = set()
  286. for row in export_rows:
  287. key = (
  288. row["item_type"],
  289. row["item_text"],
  290. str(row.get("point_category") or ""),
  291. )
  292. if key in seen:
  293. continue
  294. seen.add(key)
  295. deduped_rows.append(row)
  296. return deduped_rows
  297. def attach_wxindex_metadata(
  298. export_rows: list[dict[str, Any]],
  299. trend_json: dict[str, Any] | None,
  300. ) -> list[dict[str, Any]]:
  301. latest_score = (
  302. get_latest_wxindex_score(trend_json)
  303. if isinstance(trend_json, dict)
  304. else None
  305. )
  306. trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
  307. wxindex_keyword = get_wxindex_keyword(trend_json)
  308. rows: list[dict[str, Any]] = []
  309. for row in export_rows:
  310. matched_demand = str(row.get("matched_demand") or "").strip()
  311. has_matched_demand = _has_matched_demand_text(matched_demand)
  312. has_record_wxindex = latest_score is not None
  313. if has_record_wxindex and has_matched_demand:
  314. wxindex_score = float(latest_score)
  315. wxindex_trend_value = trend
  316. else:
  317. wxindex_score = 0.0
  318. wxindex_trend_value = ""
  319. normalized_row = dict(row)
  320. if not has_matched_demand:
  321. normalized_row["contribution_score"] = None
  322. rows.append(
  323. {
  324. **normalized_row,
  325. "matched_demand": matched_demand,
  326. "wxindex_keyword": wxindex_keyword,
  327. "wxindex_latest_score": wxindex_score,
  328. "wxindex_trend": wxindex_trend_value,
  329. }
  330. )
  331. return rows
  332. def _json_loads(value: Any) -> Any:
  333. if value is None:
  334. return None
  335. if isinstance(value, (dict, list)):
  336. return value
  337. if isinstance(value, (bytes, bytearray)):
  338. value = value.decode("utf-8")
  339. if isinstance(value, str):
  340. return json.loads(value)
  341. return value
  342. def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]:
  343. limit_sql = "" if limit <= 0 else "LIMIT %s"
  344. params: tuple[Any, ...] = () if limit <= 0 else (limit,)
  345. cursor.execute(
  346. f"""
  347. SELECT
  348. id,
  349. source,
  350. title,
  351. article_title,
  352. contribution_points_json,
  353. contribution_demand_match_json,
  354. wxindex_trend_json
  355. FROM hot_content_records
  356. WHERE contribution_demand_match_json IS NOT NULL
  357. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  358. ORDER BY id ASC
  359. {limit_sql}
  360. """,
  361. params,
  362. )
  363. return list(cursor.fetchall())
  364. def export_existing_records(
  365. repository: HotContentRepository,
  366. records: list[dict[str, Any]],
  367. *,
  368. dry_run: bool,
  369. verbose: bool,
  370. ) -> dict[str, int]:
  371. summary = {
  372. "scanned": 0,
  373. "exported_records": 0,
  374. "exported_rows": 0,
  375. "no_export_rows": 0,
  376. "invalid_json": 0,
  377. "skipped": 0,
  378. }
  379. for row in records:
  380. summary["scanned"] += 1
  381. record_id = int(row["id"])
  382. try:
  383. match_json = _json_loads(row.get("contribution_demand_match_json"))
  384. contribution_points = _json_loads(row.get("contribution_points_json"))
  385. trend_json = _json_loads(row.get("wxindex_trend_json"))
  386. except json.JSONDecodeError:
  387. summary["invalid_json"] += 1
  388. if verbose:
  389. print(f"id={record_id}: JSON 解析失败,已跳过")
  390. continue
  391. if not isinstance(match_json, dict):
  392. summary["skipped"] += 1
  393. continue
  394. export_rows = attach_wxindex_metadata(
  395. build_demand_export_rows(
  396. match_json,
  397. contribution_points=(
  398. contribution_points if isinstance(contribution_points, dict) else None
  399. ),
  400. trend_json=trend_json if isinstance(trend_json, dict) else None,
  401. ),
  402. trend_json if isinstance(trend_json, dict) else None,
  403. )
  404. if not export_rows:
  405. summary["no_export_rows"] += 1
  406. if not dry_run:
  407. repository.replace_demand_export_rows(
  408. record_id=record_id,
  409. source=str(row.get("source") or ""),
  410. hot_title=str(row.get("title") or ""),
  411. article_title=str(row.get("article_title") or ""),
  412. rows=[],
  413. )
  414. continue
  415. if verbose or dry_run:
  416. matched_rows = sum(
  417. 1 for item in export_rows if str(item.get("matched_demand") or "").strip()
  418. )
  419. print(
  420. f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} "
  421. f"title={str(row.get('title') or '')[:40]}"
  422. )
  423. if not dry_run:
  424. repository.replace_demand_export_rows(
  425. record_id=record_id,
  426. source=str(row.get("source") or ""),
  427. hot_title=str(row.get("title") or ""),
  428. article_title=str(row.get("article_title") or ""),
  429. rows=export_rows,
  430. )
  431. summary["exported_records"] += 1
  432. summary["exported_rows"] += len(export_rows)
  433. return summary
  434. def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
  435. parser = argparse.ArgumentParser(
  436. description=(
  437. "扫描已有 contribution_demand_match_json 记录,"
  438. "全量导出元素/短语到 hot_content_demand_exports(含未匹配需求的项);"
  439. "元素/短语按灵感点/目的点/关键点展开为多行,"
  440. "并补充获取微信指数的词、微信指数及趋势。"
  441. ),
  442. )
  443. parser.add_argument(
  444. "--limit",
  445. type=int,
  446. default=0,
  447. help="最多处理多少条记录,默认 0 表示不限制。",
  448. )
  449. parser.add_argument(
  450. "--dry-run",
  451. action="store_true",
  452. help="只统计/打印,不写入数据库。",
  453. )
  454. parser.add_argument(
  455. "--verbose",
  456. action="store_true",
  457. help="打印每条成功导出的记录。",
  458. )
  459. return parser.parse_args(argv)
  460. def main(argv: list[str] | None = None) -> dict[str, int]:
  461. args = parse_args(argv)
  462. config = load_flow_config()
  463. repository = HotContentRepository(config.mysql)
  464. try:
  465. with repository.conn.cursor() as cursor:
  466. records = fetch_export_candidate_records(cursor, args.limit)
  467. summary = export_existing_records(
  468. repository,
  469. records,
  470. dry_run=args.dry_run,
  471. verbose=args.verbose,
  472. )
  473. finally:
  474. repository.close()
  475. action = "预览完成" if args.dry_run else "导出完成"
  476. print(f"{action}:{json.dumps(summary, ensure_ascii=False)}")
  477. return summary
  478. if __name__ == "__main__":
  479. main()