demand_export.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. """微信指数达标后的需求元素/短语导出逻辑。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. from typing import Any
  6. from app.hot_content.config import load_flow_config
  7. from app.hot_content.repository import HotContentRepository
  8. WXINDEX_EXPORT_THRESHOLD = 1_000_000.0 # 与 WXINDEX_SCORE_THRESHOLD 默认值一致
  9. POINT_CATEGORIES = ("灵感点", "目的点", "关键点")
  10. ITEM_TYPE_ELEMENT = "元素"
  11. ITEM_TYPE_PHRASE = "短语"
  12. def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None:
  13. wxindex = trend_json.get("wxindex")
  14. if not isinstance(wxindex, dict):
  15. return None
  16. try:
  17. return float(wxindex.get("latest_total_score"))
  18. except (TypeError, ValueError):
  19. return None
  20. def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
  21. if not isinstance(trend_json, dict):
  22. return ""
  23. wxindex = trend_json.get("wxindex")
  24. if isinstance(wxindex, dict):
  25. keyword = str(wxindex.get("keyword") or "").strip()
  26. if keyword:
  27. return keyword
  28. return str(trend_json.get("llm_selected_word") or "").strip()
  29. def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
  30. wxindex = trend_json.get("wxindex")
  31. if not isinstance(wxindex, dict):
  32. return ""
  33. return str(wxindex.get("trend") or "").strip()
  34. def _to_contribution_score(value: Any) -> float | None:
  35. try:
  36. if value is None:
  37. return None
  38. return float(value)
  39. except (TypeError, ValueError):
  40. return None
  41. def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]:
  42. match_rows = word_row.get("匹配需求列表") or []
  43. if not isinstance(match_rows, list):
  44. return []
  45. names: list[str] = []
  46. seen: set[str] = set()
  47. for match in match_rows:
  48. if not isinstance(match, dict):
  49. continue
  50. demand_name = str(match.get("demand_name") or "").strip()
  51. if not demand_name or demand_name in seen:
  52. continue
  53. seen.add(demand_name)
  54. names.append(demand_name)
  55. return names
  56. def extract_matched_demand_names(word_row: dict[str, Any]) -> str:
  57. return " ".join(extract_matched_demand_name_list(word_row))
  58. def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
  59. lookup: dict[str, dict[str, Any]] = {}
  60. for word_row in words_rows:
  61. if not isinstance(word_row, dict):
  62. continue
  63. word_text = str(word_row.get("词") or "").strip()
  64. if word_text:
  65. lookup[word_text] = word_row
  66. return lookup
  67. def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
  68. word_categories: dict[str, set[str]] = {}
  69. if not isinstance(points, list):
  70. return word_categories
  71. for point_item in points:
  72. if not isinstance(point_item, dict):
  73. continue
  74. category = str(point_item.get("来源") or "").strip()
  75. if category not in POINT_CATEGORIES:
  76. continue
  77. match_words = point_item.get("匹配词列表") or []
  78. if not isinstance(match_words, list):
  79. continue
  80. for hit in match_words:
  81. if not isinstance(hit, dict):
  82. continue
  83. word_text = str(hit.get("词") or "").strip()
  84. if not word_text:
  85. continue
  86. word_categories.setdefault(word_text, set()).add(category)
  87. return word_categories
  88. def ordered_point_categories(categories: set[str]) -> list[str]:
  89. return [category for category in POINT_CATEGORIES if category in categories]
  90. def extract_point_matched_demand_names(
  91. point_item: dict[str, Any],
  92. word_lookup: dict[str, dict[str, Any]],
  93. ) -> str:
  94. match_words = point_item.get("匹配词列表") or []
  95. if not isinstance(match_words, list):
  96. return ""
  97. names: list[str] = []
  98. seen: set[str] = set()
  99. for hit in match_words:
  100. if not isinstance(hit, dict):
  101. continue
  102. word_text = str(hit.get("词") or "").strip()
  103. word_row = word_lookup.get(word_text)
  104. if not word_row:
  105. continue
  106. for demand_name in extract_matched_demand_name_list(word_row):
  107. if demand_name in seen:
  108. continue
  109. seen.add(demand_name)
  110. names.append(demand_name)
  111. return " ".join(names)
  112. def _build_word_export_row(
  113. word_text: str,
  114. word_row: dict[str, Any],
  115. category: str,
  116. ) -> dict[str, Any]:
  117. return {
  118. "item_type": ITEM_TYPE_ELEMENT,
  119. "item_text": word_text,
  120. "point_category": category,
  121. "matched_demand": extract_matched_demand_names(word_row),
  122. "contribution_score": _to_contribution_score(word_row.get("贡献度")),
  123. }
  124. def _resolve_word_row(
  125. word_text: str,
  126. *,
  127. word_lookup: dict[str, dict[str, Any]],
  128. match_result: dict[str, Any],
  129. ) -> dict[str, Any]:
  130. word_row = word_lookup.get(word_text)
  131. if isinstance(word_row, dict):
  132. return word_row
  133. for row in match_result.get("匹配到需求的词列表") or []:
  134. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  135. return row
  136. for row in match_result.get("高贡献词列表") or []:
  137. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  138. return row
  139. return {"词": word_text}
  140. def append_wxindex_keyword_rows(
  141. export_rows: list[dict[str, Any]],
  142. *,
  143. trend_json: dict[str, Any] | None,
  144. match_result: dict[str, Any],
  145. word_lookup: dict[str, dict[str, Any]],
  146. word_to_categories: dict[str, set[str]],
  147. ) -> None:
  148. keyword = get_wxindex_keyword(trend_json)
  149. if not keyword:
  150. return
  151. has_keyword_row = any(
  152. row.get("item_type") == ITEM_TYPE_ELEMENT and str(row.get("item_text") or "").strip() == keyword
  153. for row in export_rows
  154. )
  155. if has_keyword_row:
  156. return
  157. word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
  158. categories = ordered_point_categories(word_to_categories.get(keyword, set()))
  159. if categories:
  160. for category in categories:
  161. export_rows.append(_build_word_export_row(keyword, word_row, category))
  162. return
  163. export_rows.append(_build_word_export_row(keyword, word_row, ""))
  164. def build_demand_export_rows(
  165. match_result: dict[str, Any],
  166. *,
  167. contribution_points: dict[str, Any] | None = None,
  168. trend_json: dict[str, Any] | None = None,
  169. ) -> list[dict[str, Any]]:
  170. export_rows: list[dict[str, Any]] = []
  171. words_rows = match_result.get("高贡献词列表") or []
  172. if not isinstance(words_rows, list):
  173. words_rows = []
  174. contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
  175. points = contribution_source.get("点列表") or []
  176. if not isinstance(points, list):
  177. points = []
  178. word_lookup = build_word_lookup(words_rows)
  179. word_to_categories = build_word_to_categories(points)
  180. for word_text, word_row in word_lookup.items():
  181. categories = ordered_point_categories(word_to_categories.get(word_text, set()))
  182. if not categories:
  183. continue
  184. for category in categories:
  185. export_rows.append(_build_word_export_row(word_text, word_row, category))
  186. for point_item in points:
  187. if not isinstance(point_item, dict):
  188. continue
  189. point_text = str(point_item.get("点") or "").strip()
  190. category = str(point_item.get("来源") or "").strip()
  191. if not point_text or category not in POINT_CATEGORIES:
  192. continue
  193. export_rows.append(
  194. {
  195. "item_type": ITEM_TYPE_PHRASE,
  196. "item_text": point_text,
  197. "point_category": category,
  198. "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
  199. "contribution_score": None,
  200. }
  201. )
  202. append_wxindex_keyword_rows(
  203. export_rows,
  204. trend_json=trend_json,
  205. match_result=match_result,
  206. word_lookup=word_lookup,
  207. word_to_categories=word_to_categories,
  208. )
  209. deduped_rows: list[dict[str, Any]] = []
  210. seen: set[tuple[str, str, str]] = set()
  211. for row in export_rows:
  212. key = (
  213. row["item_type"],
  214. row["item_text"],
  215. str(row.get("point_category") or ""),
  216. )
  217. if key in seen:
  218. continue
  219. seen.add(key)
  220. deduped_rows.append(row)
  221. return deduped_rows
  222. def attach_wxindex_metadata(
  223. export_rows: list[dict[str, Any]],
  224. trend_json: dict[str, Any] | None,
  225. ) -> list[dict[str, Any]]:
  226. latest_score = (
  227. get_latest_wxindex_score(trend_json)
  228. if isinstance(trend_json, dict)
  229. else None
  230. )
  231. trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
  232. wxindex_keyword = get_wxindex_keyword(trend_json)
  233. rows: list[dict[str, Any]] = []
  234. for row in export_rows:
  235. item_type = str(row.get("item_type") or "")
  236. item_text = str(row.get("item_text") or "").strip()
  237. matched_demand = str(row.get("matched_demand") or "").strip()
  238. has_record_wxindex = latest_score is not None
  239. is_wxindex_keyword = (
  240. item_type == ITEM_TYPE_ELEMENT and wxindex_keyword and item_text == wxindex_keyword
  241. )
  242. if has_record_wxindex and (
  243. matched_demand or item_type == ITEM_TYPE_PHRASE or is_wxindex_keyword
  244. ):
  245. wxindex_score = float(latest_score)
  246. wxindex_trend_value = trend
  247. else:
  248. wxindex_score = 0.0
  249. wxindex_trend_value = ""
  250. rows.append(
  251. {
  252. **row,
  253. "wxindex_keyword": wxindex_keyword,
  254. "wxindex_latest_score": wxindex_score,
  255. "wxindex_trend": wxindex_trend_value,
  256. }
  257. )
  258. return rows
  259. def _json_loads(value: Any) -> Any:
  260. if value is None:
  261. return None
  262. if isinstance(value, (dict, list)):
  263. return value
  264. if isinstance(value, (bytes, bytearray)):
  265. value = value.decode("utf-8")
  266. if isinstance(value, str):
  267. return json.loads(value)
  268. return value
  269. def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]:
  270. limit_sql = "" if limit <= 0 else "LIMIT %s"
  271. params: tuple[Any, ...] = () if limit <= 0 else (limit,)
  272. cursor.execute(
  273. f"""
  274. SELECT
  275. id,
  276. source,
  277. title,
  278. article_title,
  279. contribution_points_json,
  280. contribution_demand_match_json,
  281. wxindex_trend_json
  282. FROM hot_content_records
  283. WHERE contribution_demand_match_json IS NOT NULL
  284. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  285. ORDER BY id ASC
  286. {limit_sql}
  287. """,
  288. params,
  289. )
  290. return list(cursor.fetchall())
  291. def export_existing_records(
  292. repository: HotContentRepository,
  293. records: list[dict[str, Any]],
  294. *,
  295. dry_run: bool,
  296. verbose: bool,
  297. ) -> dict[str, int]:
  298. summary = {
  299. "scanned": 0,
  300. "exported_records": 0,
  301. "exported_rows": 0,
  302. "no_export_rows": 0,
  303. "invalid_json": 0,
  304. "skipped": 0,
  305. }
  306. for row in records:
  307. summary["scanned"] += 1
  308. record_id = int(row["id"])
  309. try:
  310. match_json = _json_loads(row.get("contribution_demand_match_json"))
  311. contribution_points = _json_loads(row.get("contribution_points_json"))
  312. trend_json = _json_loads(row.get("wxindex_trend_json"))
  313. except json.JSONDecodeError:
  314. summary["invalid_json"] += 1
  315. if verbose:
  316. print(f"id={record_id}: JSON 解析失败,已跳过")
  317. continue
  318. if not isinstance(match_json, dict):
  319. summary["skipped"] += 1
  320. continue
  321. latest_score = (
  322. get_latest_wxindex_score(trend_json)
  323. if isinstance(trend_json, dict)
  324. else None
  325. )
  326. export_rows = attach_wxindex_metadata(
  327. build_demand_export_rows(
  328. match_json,
  329. contribution_points=(
  330. contribution_points if isinstance(contribution_points, dict) else None
  331. ),
  332. trend_json=trend_json if isinstance(trend_json, dict) else None,
  333. ),
  334. trend_json if isinstance(trend_json, dict) else None,
  335. )
  336. if not export_rows:
  337. summary["no_export_rows"] += 1
  338. continue
  339. if verbose or dry_run:
  340. matched_rows = sum(
  341. 1 for item in export_rows if str(item.get("matched_demand") or "").strip()
  342. )
  343. print(
  344. f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} "
  345. f"title={str(row.get('title') or '')[:40]}"
  346. )
  347. if not dry_run:
  348. repository.replace_demand_export_rows(
  349. record_id=record_id,
  350. source=str(row.get("source") or ""),
  351. hot_title=str(row.get("title") or ""),
  352. article_title=str(row.get("article_title") or ""),
  353. rows=export_rows,
  354. )
  355. summary["exported_records"] += 1
  356. summary["exported_rows"] += len(export_rows)
  357. return summary
  358. def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
  359. parser = argparse.ArgumentParser(
  360. description=(
  361. "扫描已有 contribution_demand_match_json 记录,"
  362. "导出全部元素/短语到 hot_content_demand_exports;"
  363. "元素/短语按灵感点/目的点/关键点展开为多行,无点类型数据过滤;"
  364. "并补充获取微信指数的词、微信指数及趋势。"
  365. ),
  366. )
  367. parser.add_argument(
  368. "--limit",
  369. type=int,
  370. default=0,
  371. help="最多处理多少条记录,默认 0 表示不限制。",
  372. )
  373. parser.add_argument(
  374. "--dry-run",
  375. action="store_true",
  376. help="只统计/打印,不写入数据库。",
  377. )
  378. parser.add_argument(
  379. "--verbose",
  380. action="store_true",
  381. help="打印每条成功导出的记录。",
  382. )
  383. return parser.parse_args(argv)
  384. def main(argv: list[str] | None = None) -> dict[str, int]:
  385. args = parse_args(argv)
  386. config = load_flow_config()
  387. repository = HotContentRepository(config.mysql)
  388. try:
  389. with repository.conn.cursor() as cursor:
  390. records = fetch_export_candidate_records(cursor, args.limit)
  391. summary = export_existing_records(
  392. repository,
  393. records,
  394. dry_run=args.dry_run,
  395. verbose=args.verbose,
  396. )
  397. finally:
  398. repository.close()
  399. action = "预览完成" if args.dry_run else "导出完成"
  400. print(f"{action}:{json.dumps(summary, ensure_ascii=False)}")
  401. return summary
  402. if __name__ == "__main__":
  403. main()