demand_export.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. """微信指数达标后的需求元素/短语导出逻辑。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. from typing import Any
  6. from app.hot_content.config import load_flow_config
  7. from app.hot_content.repository import HotContentRepository
  8. WXINDEX_EXPORT_THRESHOLD = 1_000_000.0 # 与 WXINDEX_SCORE_THRESHOLD 默认值一致
  9. POINT_CATEGORIES = ("灵感点", "目的点", "关键点")
  10. ITEM_TYPE_ELEMENT = "元素"
  11. ITEM_TYPE_PHRASE = "短语"
  12. def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None:
  13. wxindex = trend_json.get("wxindex")
  14. if not isinstance(wxindex, dict):
  15. return None
  16. try:
  17. return float(wxindex.get("latest_total_score"))
  18. except (TypeError, ValueError):
  19. return None
  20. def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
  21. if not isinstance(trend_json, dict):
  22. return ""
  23. wxindex = trend_json.get("wxindex")
  24. if isinstance(wxindex, dict):
  25. keyword = str(wxindex.get("keyword") or "").strip()
  26. if keyword:
  27. return keyword
  28. return str(trend_json.get("llm_selected_word") or "").strip()
  29. def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
  30. wxindex = trend_json.get("wxindex")
  31. if not isinstance(wxindex, dict):
  32. return ""
  33. return str(wxindex.get("trend") or "").strip()
  34. def _to_contribution_score(value: Any) -> float | None:
  35. try:
  36. if value is None:
  37. return None
  38. return float(value)
  39. except (TypeError, ValueError):
  40. return None
  41. def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]:
  42. match_rows = word_row.get("匹配需求列表") or []
  43. if not isinstance(match_rows, list):
  44. return []
  45. names: list[str] = []
  46. seen: set[str] = set()
  47. for match in match_rows:
  48. if not isinstance(match, dict):
  49. continue
  50. demand_name = str(match.get("demand_name") or "").strip()
  51. if not demand_name or demand_name in seen:
  52. continue
  53. seen.add(demand_name)
  54. names.append(demand_name)
  55. return names
  56. def extract_matched_demand_names(word_row: dict[str, Any]) -> str:
  57. return " ".join(extract_matched_demand_name_list(word_row))
  58. def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
  59. lookup: dict[str, dict[str, Any]] = {}
  60. for word_row in words_rows:
  61. if not isinstance(word_row, dict):
  62. continue
  63. word_text = str(word_row.get("词") or "").strip()
  64. if word_text:
  65. lookup[word_text] = word_row
  66. return lookup
  67. def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
  68. word_categories: dict[str, set[str]] = {}
  69. if not isinstance(points, list):
  70. return word_categories
  71. for point_item in points:
  72. if not isinstance(point_item, dict):
  73. continue
  74. category = str(point_item.get("来源") or "").strip()
  75. if category not in POINT_CATEGORIES:
  76. continue
  77. match_words = point_item.get("匹配词列表") or []
  78. if not isinstance(match_words, list):
  79. continue
  80. for hit in match_words:
  81. if not isinstance(hit, dict):
  82. continue
  83. word_text = str(hit.get("词") or "").strip()
  84. if not word_text:
  85. continue
  86. word_categories.setdefault(word_text, set()).add(category)
  87. return word_categories
  88. def ordered_point_categories(categories: set[str]) -> list[str]:
  89. return [category for category in POINT_CATEGORIES if category in categories]
  90. def extract_point_matched_demand_names(
  91. point_item: dict[str, Any],
  92. word_lookup: dict[str, dict[str, Any]],
  93. ) -> str:
  94. match_words = point_item.get("匹配词列表") or []
  95. if not isinstance(match_words, list):
  96. return ""
  97. names: list[str] = []
  98. seen: set[str] = set()
  99. for hit in match_words:
  100. if not isinstance(hit, dict):
  101. continue
  102. word_text = str(hit.get("词") or "").strip()
  103. word_row = word_lookup.get(word_text)
  104. if not word_row:
  105. continue
  106. for demand_name in extract_matched_demand_name_list(word_row):
  107. if demand_name in seen:
  108. continue
  109. seen.add(demand_name)
  110. names.append(demand_name)
  111. return " ".join(names)
  112. def _build_word_export_row(
  113. word_text: str,
  114. word_row: dict[str, Any],
  115. category: str,
  116. ) -> dict[str, Any]:
  117. return {
  118. "item_type": ITEM_TYPE_ELEMENT,
  119. "item_text": word_text,
  120. "point_category": category,
  121. "matched_demand": extract_matched_demand_names(word_row),
  122. "contribution_score": _to_contribution_score(word_row.get("贡献度")),
  123. }
  124. def _resolve_word_row(
  125. word_text: str,
  126. *,
  127. word_lookup: dict[str, dict[str, Any]],
  128. match_result: dict[str, Any],
  129. ) -> dict[str, Any]:
  130. word_row = word_lookup.get(word_text)
  131. if isinstance(word_row, dict):
  132. return word_row
  133. for row in match_result.get("匹配到需求的词列表") or []:
  134. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  135. return row
  136. for row in match_result.get("高贡献词列表") or []:
  137. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  138. return row
  139. return {"词": word_text}
  140. def append_wxindex_keyword_rows(
  141. export_rows: list[dict[str, Any]],
  142. *,
  143. trend_json: dict[str, Any] | None,
  144. match_result: dict[str, Any],
  145. word_lookup: dict[str, dict[str, Any]],
  146. word_to_categories: dict[str, set[str]],
  147. ) -> None:
  148. keyword = get_wxindex_keyword(trend_json)
  149. if not keyword:
  150. return
  151. has_keyword_row = any(
  152. row.get("item_type") == ITEM_TYPE_ELEMENT and str(row.get("item_text") or "").strip() == keyword
  153. for row in export_rows
  154. )
  155. if has_keyword_row:
  156. return
  157. word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
  158. if not extract_matched_demand_names(word_row):
  159. return
  160. categories = ordered_point_categories(word_to_categories.get(keyword, set()))
  161. if categories:
  162. for category in categories:
  163. export_rows.append(_build_word_export_row(keyword, word_row, category))
  164. return
  165. export_rows.append(_build_word_export_row(keyword, word_row, ""))
  166. def build_demand_export_rows(
  167. match_result: dict[str, Any],
  168. *,
  169. contribution_points: dict[str, Any] | None = None,
  170. trend_json: dict[str, Any] | None = None,
  171. ) -> list[dict[str, Any]]:
  172. export_rows: list[dict[str, Any]] = []
  173. words_rows = match_result.get("高贡献词列表") or []
  174. if not isinstance(words_rows, list):
  175. words_rows = []
  176. contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
  177. points = match_result.get("点列表") or []
  178. if not isinstance(points, list) or not points:
  179. points = contribution_source.get("点列表") or []
  180. if not isinstance(points, list):
  181. points = []
  182. word_lookup = build_word_lookup(words_rows)
  183. word_to_categories = build_word_to_categories(points)
  184. for word_text, word_row in word_lookup.items():
  185. categories = ordered_point_categories(word_to_categories.get(word_text, set()))
  186. if not categories:
  187. continue
  188. if not extract_matched_demand_names(word_row):
  189. continue
  190. for category in categories:
  191. export_rows.append(_build_word_export_row(word_text, word_row, category))
  192. for point_item in points:
  193. if not isinstance(point_item, dict):
  194. continue
  195. point_text = str(point_item.get("点") or "").strip()
  196. category = str(point_item.get("来源") or "").strip()
  197. if not point_text or category not in POINT_CATEGORIES:
  198. continue
  199. matched_demand = extract_point_matched_demand_names(point_item, word_lookup)
  200. if not matched_demand:
  201. continue
  202. export_rows.append(
  203. {
  204. "item_type": ITEM_TYPE_PHRASE,
  205. "item_text": point_text,
  206. "point_category": category,
  207. "matched_demand": matched_demand,
  208. "contribution_score": None,
  209. }
  210. )
  211. append_wxindex_keyword_rows(
  212. export_rows,
  213. trend_json=trend_json,
  214. match_result=match_result,
  215. word_lookup=word_lookup,
  216. word_to_categories=word_to_categories,
  217. )
  218. deduped_rows: list[dict[str, Any]] = []
  219. seen: set[tuple[str, str, str]] = set()
  220. for row in export_rows:
  221. key = (
  222. row["item_type"],
  223. row["item_text"],
  224. str(row.get("point_category") or ""),
  225. )
  226. if key in seen:
  227. continue
  228. seen.add(key)
  229. deduped_rows.append(row)
  230. return deduped_rows
  231. def attach_wxindex_metadata(
  232. export_rows: list[dict[str, Any]],
  233. trend_json: dict[str, Any] | None,
  234. ) -> list[dict[str, Any]]:
  235. latest_score = (
  236. get_latest_wxindex_score(trend_json)
  237. if isinstance(trend_json, dict)
  238. else None
  239. )
  240. trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
  241. wxindex_keyword = get_wxindex_keyword(trend_json)
  242. rows: list[dict[str, Any]] = []
  243. for row in export_rows:
  244. matched_demand = str(row.get("matched_demand") or "").strip()
  245. has_record_wxindex = latest_score is not None
  246. if has_record_wxindex and matched_demand:
  247. wxindex_score = float(latest_score)
  248. wxindex_trend_value = trend
  249. else:
  250. wxindex_score = 0.0
  251. wxindex_trend_value = ""
  252. rows.append(
  253. {
  254. **row,
  255. "wxindex_keyword": wxindex_keyword,
  256. "wxindex_latest_score": wxindex_score,
  257. "wxindex_trend": wxindex_trend_value,
  258. }
  259. )
  260. return rows
  261. def _json_loads(value: Any) -> Any:
  262. if value is None:
  263. return None
  264. if isinstance(value, (dict, list)):
  265. return value
  266. if isinstance(value, (bytes, bytearray)):
  267. value = value.decode("utf-8")
  268. if isinstance(value, str):
  269. return json.loads(value)
  270. return value
  271. def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]:
  272. limit_sql = "" if limit <= 0 else "LIMIT %s"
  273. params: tuple[Any, ...] = () if limit <= 0 else (limit,)
  274. cursor.execute(
  275. f"""
  276. SELECT
  277. id,
  278. source,
  279. title,
  280. article_title,
  281. contribution_points_json,
  282. contribution_demand_match_json,
  283. wxindex_trend_json
  284. FROM hot_content_records
  285. WHERE contribution_demand_match_json IS NOT NULL
  286. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  287. ORDER BY id ASC
  288. {limit_sql}
  289. """,
  290. params,
  291. )
  292. return list(cursor.fetchall())
  293. def export_existing_records(
  294. repository: HotContentRepository,
  295. records: list[dict[str, Any]],
  296. *,
  297. dry_run: bool,
  298. verbose: bool,
  299. ) -> dict[str, int]:
  300. summary = {
  301. "scanned": 0,
  302. "exported_records": 0,
  303. "exported_rows": 0,
  304. "no_export_rows": 0,
  305. "invalid_json": 0,
  306. "skipped": 0,
  307. }
  308. for row in records:
  309. summary["scanned"] += 1
  310. record_id = int(row["id"])
  311. try:
  312. match_json = _json_loads(row.get("contribution_demand_match_json"))
  313. contribution_points = _json_loads(row.get("contribution_points_json"))
  314. trend_json = _json_loads(row.get("wxindex_trend_json"))
  315. except json.JSONDecodeError:
  316. summary["invalid_json"] += 1
  317. if verbose:
  318. print(f"id={record_id}: JSON 解析失败,已跳过")
  319. continue
  320. if not isinstance(match_json, dict):
  321. summary["skipped"] += 1
  322. continue
  323. latest_score = (
  324. get_latest_wxindex_score(trend_json)
  325. if isinstance(trend_json, dict)
  326. else None
  327. )
  328. export_rows = attach_wxindex_metadata(
  329. build_demand_export_rows(
  330. match_json,
  331. contribution_points=(
  332. contribution_points if isinstance(contribution_points, dict) else None
  333. ),
  334. trend_json=trend_json if isinstance(trend_json, dict) else None,
  335. ),
  336. trend_json if isinstance(trend_json, dict) else None,
  337. )
  338. if not export_rows:
  339. summary["no_export_rows"] += 1
  340. if not dry_run:
  341. repository.replace_demand_export_rows(
  342. record_id=record_id,
  343. source=str(row.get("source") or ""),
  344. hot_title=str(row.get("title") or ""),
  345. article_title=str(row.get("article_title") or ""),
  346. rows=[],
  347. )
  348. continue
  349. if verbose or dry_run:
  350. matched_rows = sum(
  351. 1 for item in export_rows if str(item.get("matched_demand") or "").strip()
  352. )
  353. print(
  354. f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} "
  355. f"title={str(row.get('title') or '')[:40]}"
  356. )
  357. if not dry_run:
  358. repository.replace_demand_export_rows(
  359. record_id=record_id,
  360. source=str(row.get("source") or ""),
  361. hot_title=str(row.get("title") or ""),
  362. article_title=str(row.get("article_title") or ""),
  363. rows=export_rows,
  364. )
  365. summary["exported_records"] += 1
  366. summary["exported_rows"] += len(export_rows)
  367. return summary
  368. def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
  369. parser = argparse.ArgumentParser(
  370. description=(
  371. "扫描已有 contribution_demand_match_json 记录,"
  372. "导出全部元素/短语到 hot_content_demand_exports;"
  373. "元素/短语按灵感点/目的点/关键点展开为多行,无点类型数据过滤;"
  374. "并补充获取微信指数的词、微信指数及趋势。"
  375. ),
  376. )
  377. parser.add_argument(
  378. "--limit",
  379. type=int,
  380. default=0,
  381. help="最多处理多少条记录,默认 0 表示不限制。",
  382. )
  383. parser.add_argument(
  384. "--dry-run",
  385. action="store_true",
  386. help="只统计/打印,不写入数据库。",
  387. )
  388. parser.add_argument(
  389. "--verbose",
  390. action="store_true",
  391. help="打印每条成功导出的记录。",
  392. )
  393. return parser.parse_args(argv)
  394. def main(argv: list[str] | None = None) -> dict[str, int]:
  395. args = parse_args(argv)
  396. config = load_flow_config()
  397. repository = HotContentRepository(config.mysql)
  398. try:
  399. with repository.conn.cursor() as cursor:
  400. records = fetch_export_candidate_records(cursor, args.limit)
  401. summary = export_existing_records(
  402. repository,
  403. records,
  404. dry_run=args.dry_run,
  405. verbose=args.verbose,
  406. )
  407. finally:
  408. repository.close()
  409. action = "预览完成" if args.dry_run else "导出完成"
  410. print(f"{action}:{json.dumps(summary, ensure_ascii=False)}")
  411. return summary
  412. if __name__ == "__main__":
  413. main()