demand_export.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. """需求元素/短语导出到 MySQL 的逻辑(全量,不做 ODPS 过滤)。
  2. MySQL hot_content_demand_exports 写入本模块产出的全量行:
  3. - 元素:点列表关联的全部高贡献词(含未匹配需求)
  4. - 短语:灵感点/目的点/关键点全部短语(含未匹配需求)
  5. ODPS 写入前的过滤(微信指数门槛、matched_demand 等)在 demand_hive_export 中处理。
  6. """
  7. from __future__ import annotations
  8. import argparse
  9. import json
  10. from typing import Any
  11. from app.hot_content.config import load_flow_config
  12. from app.hot_content.repository import HotContentRepository
  13. WXINDEX_EXPORT_THRESHOLD = 1_000_000.0 # 与 WXINDEX_SCORE_THRESHOLD 默认值一致
  14. POINT_CATEGORIES = ("灵感点", "目的点", "关键点")
  15. ITEM_TYPE_ELEMENT = "元素"
  16. ITEM_TYPE_PHRASE = "短语"
  17. def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None:
  18. wxindex = trend_json.get("wxindex")
  19. if not isinstance(wxindex, dict):
  20. return None
  21. try:
  22. return float(wxindex.get("latest_total_score"))
  23. except (TypeError, ValueError):
  24. return None
  25. def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
  26. if not isinstance(trend_json, dict):
  27. return ""
  28. wxindex = trend_json.get("wxindex")
  29. if isinstance(wxindex, dict):
  30. keyword = str(wxindex.get("keyword") or "").strip()
  31. if keyword:
  32. return keyword
  33. return str(trend_json.get("llm_selected_word") or "").strip()
  34. def get_wxindex_keywords(trend_json: dict[str, Any] | None) -> list[str]:
  35. if not isinstance(trend_json, dict):
  36. return []
  37. raw_words: list[Any] = []
  38. wxindex = trend_json.get("wxindex")
  39. if isinstance(wxindex, dict):
  40. keywords = wxindex.get("keywords")
  41. if isinstance(keywords, list):
  42. raw_words = keywords
  43. elif keywords:
  44. raw_words = [keywords]
  45. if not raw_words:
  46. selected_words = trend_json.get("llm_selected_words")
  47. if isinstance(selected_words, list):
  48. raw_words = selected_words
  49. elif selected_words:
  50. raw_words = [selected_words]
  51. if not raw_words:
  52. keyword = get_wxindex_keyword(trend_json)
  53. return [keyword] if keyword else []
  54. deduped: list[str] = []
  55. seen: set[str] = set()
  56. for item in raw_words:
  57. word = str(item or "").strip()
  58. if word and word not in seen:
  59. seen.add(word)
  60. deduped.append(word)
  61. return deduped
  62. def format_wxindex_keywords(trend_json: dict[str, Any] | None) -> str:
  63. return ",".join(get_wxindex_keywords(trend_json))
  64. def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
  65. wxindex = trend_json.get("wxindex")
  66. if not isinstance(wxindex, dict):
  67. return ""
  68. return str(wxindex.get("trend") or "").strip()
  69. def _to_contribution_score(value: Any) -> float | None:
  70. try:
  71. if value is None:
  72. return None
  73. return float(value)
  74. except (TypeError, ValueError):
  75. return None
  76. def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]:
  77. match_rows = word_row.get("匹配需求列表") or []
  78. if not isinstance(match_rows, list):
  79. return []
  80. names: list[str] = []
  81. seen: set[str] = set()
  82. for match in match_rows:
  83. if not isinstance(match, dict):
  84. continue
  85. demand_name = str(match.get("demand_name") or "").strip()
  86. if not demand_name or demand_name in seen:
  87. continue
  88. seen.add(demand_name)
  89. names.append(demand_name)
  90. return names
  91. def extract_matched_demand_names(word_row: dict[str, Any]) -> str:
  92. return " ".join(extract_matched_demand_name_list(word_row))
  93. def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
  94. lookup: dict[str, dict[str, Any]] = {}
  95. for word_row in words_rows:
  96. if not isinstance(word_row, dict):
  97. continue
  98. word_text = str(word_row.get("词") or "").strip()
  99. if word_text:
  100. lookup[word_text] = word_row
  101. return lookup
  102. def _has_matched_demand_text(value: Any) -> bool:
  103. return bool(str(value or "").strip())
  104. def build_merged_word_lookup(
  105. match_result: dict[str, Any],
  106. contribution_points: dict[str, Any] | None,
  107. ) -> dict[str, dict[str, Any]]:
  108. """以 contribution 高贡献词为全集,匹配信息仅以 match_result 为准。"""
  109. match_lookup = build_word_lookup(match_result.get("高贡献词列表") or [])
  110. contribution_source = (
  111. contribution_points if isinstance(contribution_points, dict) else match_result
  112. )
  113. words_rows = contribution_source.get("高贡献词列表") or []
  114. if not isinstance(words_rows, list):
  115. words_rows = []
  116. lookup: dict[str, dict[str, Any]] = {}
  117. for word_row in words_rows:
  118. if not isinstance(word_row, dict):
  119. continue
  120. word_text = str(word_row.get("词") or "").strip()
  121. if not word_text:
  122. continue
  123. match_row = match_lookup.get(word_text)
  124. merged = {k: v for k, v in word_row.items() if k != "匹配需求列表"}
  125. if isinstance(match_row, dict):
  126. merged["匹配需求列表"] = list(match_row.get("匹配需求列表") or [])
  127. else:
  128. merged["匹配需求列表"] = []
  129. lookup[word_text] = merged
  130. for word_text, match_row in match_lookup.items():
  131. if word_text not in lookup:
  132. lookup[word_text] = match_row
  133. return lookup
  134. def enrich_word_lookup_from_points(
  135. word_lookup: dict[str, dict[str, Any]],
  136. *,
  137. points: list[Any],
  138. match_result: dict[str, Any],
  139. ) -> None:
  140. """把点列表里出现、但高贡献词列表未收录的词补进 lookup。"""
  141. if not isinstance(points, list):
  142. return
  143. for point_item in points:
  144. if not isinstance(point_item, dict):
  145. continue
  146. match_words = point_item.get("匹配词列表") or []
  147. if not isinstance(match_words, list):
  148. continue
  149. for hit in match_words:
  150. if not isinstance(hit, dict):
  151. continue
  152. word_text = str(hit.get("词") or "").strip()
  153. if not word_text or word_text in word_lookup:
  154. continue
  155. word_lookup[word_text] = _resolve_word_row(
  156. word_text,
  157. word_lookup=word_lookup,
  158. match_result=match_result,
  159. )
  160. def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
  161. word_categories: dict[str, set[str]] = {}
  162. if not isinstance(points, list):
  163. return word_categories
  164. for point_item in points:
  165. if not isinstance(point_item, dict):
  166. continue
  167. category = str(point_item.get("来源") or "").strip()
  168. if category not in POINT_CATEGORIES:
  169. continue
  170. match_words = point_item.get("匹配词列表") or []
  171. if not isinstance(match_words, list):
  172. continue
  173. for hit in match_words:
  174. if not isinstance(hit, dict):
  175. continue
  176. word_text = str(hit.get("词") or "").strip()
  177. if not word_text:
  178. continue
  179. word_categories.setdefault(word_text, set()).add(category)
  180. return word_categories
  181. def ordered_point_categories(categories: set[str]) -> list[str]:
  182. return [category for category in POINT_CATEGORIES if category in categories]
  183. def extract_point_matched_demand_names(
  184. point_item: dict[str, Any],
  185. word_lookup: dict[str, dict[str, Any]],
  186. ) -> str:
  187. match_words = point_item.get("匹配词列表") or []
  188. if not isinstance(match_words, list):
  189. return ""
  190. names: list[str] = []
  191. seen: set[str] = set()
  192. for hit in match_words:
  193. if not isinstance(hit, dict):
  194. continue
  195. word_text = str(hit.get("词") or "").strip()
  196. word_row = word_lookup.get(word_text)
  197. if not word_row:
  198. continue
  199. for demand_name in extract_matched_demand_name_list(word_row):
  200. if demand_name in seen:
  201. continue
  202. seen.add(demand_name)
  203. names.append(demand_name)
  204. return " ".join(names)
  205. def _build_word_export_row(
  206. word_text: str,
  207. word_row: dict[str, Any],
  208. category: str,
  209. ) -> dict[str, Any]:
  210. matched_demand = extract_matched_demand_names(word_row)
  211. return {
  212. "item_type": ITEM_TYPE_ELEMENT,
  213. "item_text": word_text,
  214. "point_category": category,
  215. "matched_demand": matched_demand,
  216. "contribution_score": (
  217. _to_contribution_score(word_row.get("贡献度"))
  218. if _has_matched_demand_text(matched_demand)
  219. else None
  220. ),
  221. }
  222. def _resolve_word_row(
  223. word_text: str,
  224. *,
  225. word_lookup: dict[str, dict[str, Any]],
  226. match_result: dict[str, Any],
  227. ) -> dict[str, Any]:
  228. word_row = word_lookup.get(word_text)
  229. if isinstance(word_row, dict):
  230. return word_row
  231. for row in match_result.get("匹配到需求的词列表") or []:
  232. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  233. return row
  234. for row in match_result.get("高贡献词列表") or []:
  235. if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
  236. return row
  237. return {"词": word_text}
  238. def append_wxindex_keyword_rows(
  239. export_rows: list[dict[str, Any]],
  240. *,
  241. trend_json: dict[str, Any] | None,
  242. match_result: dict[str, Any],
  243. word_lookup: dict[str, dict[str, Any]],
  244. word_to_categories: dict[str, set[str]],
  245. ) -> None:
  246. keywords = get_wxindex_keywords(trend_json)
  247. if not keywords:
  248. return
  249. existing_element_texts = {
  250. str(row.get("item_text") or "").strip()
  251. for row in export_rows
  252. if row.get("item_type") == ITEM_TYPE_ELEMENT
  253. }
  254. for keyword in keywords:
  255. if keyword in existing_element_texts:
  256. continue
  257. word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
  258. categories = ordered_point_categories(word_to_categories.get(keyword, set()))
  259. if categories:
  260. for category in categories:
  261. export_rows.append(_build_word_export_row(keyword, word_row, category))
  262. existing_element_texts.add(keyword)
  263. continue
  264. export_rows.append(_build_word_export_row(keyword, word_row, ""))
  265. existing_element_texts.add(keyword)
  266. def build_demand_export_rows(
  267. match_result: dict[str, Any],
  268. *,
  269. contribution_points: dict[str, Any] | None = None,
  270. trend_json: dict[str, Any] | None = None,
  271. ) -> list[dict[str, Any]]:
  272. export_rows: list[dict[str, Any]] = []
  273. contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
  274. points = contribution_source.get("点列表") or []
  275. if not isinstance(points, list):
  276. points = []
  277. word_lookup = build_merged_word_lookup(match_result, contribution_points)
  278. enrich_word_lookup_from_points(
  279. word_lookup,
  280. points=points,
  281. match_result=match_result,
  282. )
  283. word_to_categories = build_word_to_categories(points)
  284. for word_text in sorted(set(word_lookup) | set(word_to_categories)):
  285. categories = ordered_point_categories(word_to_categories.get(word_text, set()))
  286. if not categories:
  287. continue
  288. word_row = word_lookup.get(word_text) or _resolve_word_row(
  289. word_text,
  290. word_lookup=word_lookup,
  291. match_result=match_result,
  292. )
  293. for category in categories:
  294. export_rows.append(_build_word_export_row(word_text, word_row, category))
  295. for point_item in points:
  296. if not isinstance(point_item, dict):
  297. continue
  298. point_text = str(point_item.get("点") or "").strip()
  299. category = str(point_item.get("来源") or "").strip()
  300. if not point_text or category not in POINT_CATEGORIES:
  301. continue
  302. export_rows.append(
  303. {
  304. "item_type": ITEM_TYPE_PHRASE,
  305. "item_text": point_text,
  306. "point_category": category,
  307. "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
  308. "contribution_score": None,
  309. }
  310. )
  311. append_wxindex_keyword_rows(
  312. export_rows,
  313. trend_json=trend_json,
  314. match_result=match_result,
  315. word_lookup=word_lookup,
  316. word_to_categories=word_to_categories,
  317. )
  318. deduped_rows: list[dict[str, Any]] = []
  319. seen: set[tuple[str, str, str]] = set()
  320. for row in export_rows:
  321. key = (
  322. row["item_type"],
  323. row["item_text"],
  324. str(row.get("point_category") or ""),
  325. )
  326. if key in seen:
  327. continue
  328. seen.add(key)
  329. deduped_rows.append(row)
  330. return deduped_rows
  331. def attach_wxindex_metadata(
  332. export_rows: list[dict[str, Any]],
  333. trend_json: dict[str, Any] | None,
  334. ) -> list[dict[str, Any]]:
  335. latest_score = (
  336. get_latest_wxindex_score(trend_json)
  337. if isinstance(trend_json, dict)
  338. else None
  339. )
  340. trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
  341. wxindex_keyword = get_wxindex_keyword(trend_json)
  342. all_hot_keywords = format_wxindex_keywords(trend_json)
  343. rows: list[dict[str, Any]] = []
  344. for row in export_rows:
  345. matched_demand = str(row.get("matched_demand") or "").strip()
  346. has_matched_demand = _has_matched_demand_text(matched_demand)
  347. has_record_wxindex = latest_score is not None
  348. if has_record_wxindex and has_matched_demand:
  349. wxindex_score = float(latest_score)
  350. wxindex_trend_value = trend
  351. else:
  352. wxindex_score = 0.0
  353. wxindex_trend_value = ""
  354. normalized_row = dict(row)
  355. if not has_matched_demand:
  356. normalized_row["contribution_score"] = None
  357. rows.append(
  358. {
  359. **normalized_row,
  360. "matched_demand": matched_demand,
  361. "wxindex_keyword": wxindex_keyword,
  362. "all_hot_keywords": all_hot_keywords,
  363. "wxindex_latest_score": wxindex_score,
  364. "wxindex_trend": wxindex_trend_value,
  365. }
  366. )
  367. return rows
  368. def _json_loads(value: Any) -> Any:
  369. if value is None:
  370. return None
  371. if isinstance(value, (dict, list)):
  372. return value
  373. if isinstance(value, (bytes, bytearray)):
  374. value = value.decode("utf-8")
  375. if isinstance(value, str):
  376. return json.loads(value)
  377. return value
  378. def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]:
  379. limit_sql = "" if limit <= 0 else "LIMIT %s"
  380. params: tuple[Any, ...] = () if limit <= 0 else (limit,)
  381. cursor.execute(
  382. f"""
  383. SELECT
  384. id,
  385. source,
  386. title,
  387. article_title,
  388. contribution_points_json,
  389. contribution_demand_match_json,
  390. wxindex_trend_json
  391. FROM hot_content_records
  392. WHERE contribution_demand_match_json IS NOT NULL
  393. AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
  394. ORDER BY id ASC
  395. {limit_sql}
  396. """,
  397. params,
  398. )
  399. return list(cursor.fetchall())
  400. def export_existing_records(
  401. repository: HotContentRepository,
  402. records: list[dict[str, Any]],
  403. *,
  404. dry_run: bool,
  405. verbose: bool,
  406. ) -> dict[str, int]:
  407. summary = {
  408. "scanned": 0,
  409. "exported_records": 0,
  410. "exported_rows": 0,
  411. "no_export_rows": 0,
  412. "invalid_json": 0,
  413. "skipped": 0,
  414. }
  415. for row in records:
  416. summary["scanned"] += 1
  417. record_id = int(row["id"])
  418. try:
  419. match_json = _json_loads(row.get("contribution_demand_match_json"))
  420. contribution_points = _json_loads(row.get("contribution_points_json"))
  421. trend_json = _json_loads(row.get("wxindex_trend_json"))
  422. except json.JSONDecodeError:
  423. summary["invalid_json"] += 1
  424. if verbose:
  425. print(f"id={record_id}: JSON 解析失败,已跳过")
  426. continue
  427. if not isinstance(match_json, dict):
  428. summary["skipped"] += 1
  429. continue
  430. export_rows = attach_wxindex_metadata(
  431. build_demand_export_rows(
  432. match_json,
  433. contribution_points=(
  434. contribution_points if isinstance(contribution_points, dict) else None
  435. ),
  436. trend_json=trend_json if isinstance(trend_json, dict) else None,
  437. ),
  438. trend_json if isinstance(trend_json, dict) else None,
  439. )
  440. if not export_rows:
  441. summary["no_export_rows"] += 1
  442. if not dry_run:
  443. repository.replace_demand_export_rows(
  444. record_id=record_id,
  445. source=str(row.get("source") or ""),
  446. hot_title=str(row.get("title") or ""),
  447. article_title=str(row.get("article_title") or ""),
  448. rows=[],
  449. )
  450. continue
  451. if verbose or dry_run:
  452. matched_rows = sum(
  453. 1 for item in export_rows if str(item.get("matched_demand") or "").strip()
  454. )
  455. print(
  456. f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} "
  457. f"title={str(row.get('title') or '')[:40]}"
  458. )
  459. if not dry_run:
  460. repository.replace_demand_export_rows(
  461. record_id=record_id,
  462. source=str(row.get("source") or ""),
  463. hot_title=str(row.get("title") or ""),
  464. article_title=str(row.get("article_title") or ""),
  465. rows=export_rows,
  466. )
  467. summary["exported_records"] += 1
  468. summary["exported_rows"] += len(export_rows)
  469. return summary
  470. def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
  471. parser = argparse.ArgumentParser(
  472. description=(
  473. "扫描已有 contribution_demand_match_json 记录,"
  474. "全量导出元素/短语到 hot_content_demand_exports(含未匹配需求的项);"
  475. "元素/短语按灵感点/目的点/关键点展开为多行,"
  476. "并补充获取微信指数的词、微信指数及趋势。"
  477. ),
  478. )
  479. parser.add_argument(
  480. "--limit",
  481. type=int,
  482. default=0,
  483. help="最多处理多少条记录,默认 0 表示不限制。",
  484. )
  485. parser.add_argument(
  486. "--dry-run",
  487. action="store_true",
  488. help="只统计/打印,不写入数据库。",
  489. )
  490. parser.add_argument(
  491. "--verbose",
  492. action="store_true",
  493. help="打印每条成功导出的记录。",
  494. )
  495. return parser.parse_args(argv)
  496. def main(argv: list[str] | None = None) -> dict[str, int]:
  497. args = parse_args(argv)
  498. config = load_flow_config()
  499. repository = HotContentRepository(config.mysql)
  500. try:
  501. with repository.conn.cursor() as cursor:
  502. records = fetch_export_candidate_records(cursor, args.limit)
  503. summary = export_existing_records(
  504. repository,
  505. records,
  506. dry_run=args.dry_run,
  507. verbose=args.verbose,
  508. )
  509. finally:
  510. repository.close()
  511. action = "预览完成" if args.dry_run else "导出完成"
  512. print(f"{action}:{json.dumps(summary, ensure_ascii=False)}")
  513. return summary
  514. if __name__ == "__main__":
  515. main()