| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606 |
- """需求元素/短语导出到 MySQL 的逻辑(全量,不做 ODPS 过滤)。
- MySQL hot_content_demand_exports 写入本模块产出的全量行:
- - 元素:点列表关联的全部高贡献词(含未匹配需求)
- - 短语:灵感点/目的点/关键点全部短语(含未匹配需求)
- ODPS 写入前的过滤(微信指数门槛、matched_demand 等)在 demand_hive_export 中处理。
- """
- from __future__ import annotations
- import argparse
- import json
- from typing import Any
- from app.hot_content.config import load_flow_config
- from app.hot_content.repository import HotContentRepository
- WXINDEX_EXPORT_THRESHOLD = 1_000_000.0 # 与 WXINDEX_SCORE_THRESHOLD 默认值一致
- POINT_CATEGORIES = ("灵感点", "目的点", "关键点")
- ITEM_TYPE_ELEMENT = "元素"
- ITEM_TYPE_PHRASE = "短语"
- def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None:
- wxindex = trend_json.get("wxindex")
- if not isinstance(wxindex, dict):
- return None
- try:
- return float(wxindex.get("latest_total_score"))
- except (TypeError, ValueError):
- return None
- def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
- if not isinstance(trend_json, dict):
- return ""
- wxindex = trend_json.get("wxindex")
- if isinstance(wxindex, dict):
- keyword = str(wxindex.get("keyword") or "").strip()
- if keyword:
- return keyword
- return str(trend_json.get("llm_selected_word") or "").strip()
- def get_wxindex_keywords(trend_json: dict[str, Any] | None) -> list[str]:
- if not isinstance(trend_json, dict):
- return []
- raw_words: list[Any] = []
- wxindex = trend_json.get("wxindex")
- if isinstance(wxindex, dict):
- keywords = wxindex.get("keywords")
- if isinstance(keywords, list):
- raw_words = keywords
- elif keywords:
- raw_words = [keywords]
- if not raw_words:
- selected_words = trend_json.get("llm_selected_words")
- if isinstance(selected_words, list):
- raw_words = selected_words
- elif selected_words:
- raw_words = [selected_words]
- if not raw_words:
- keyword = get_wxindex_keyword(trend_json)
- return [keyword] if keyword else []
- deduped: list[str] = []
- seen: set[str] = set()
- for item in raw_words:
- word = str(item or "").strip()
- if word and word not in seen:
- seen.add(word)
- deduped.append(word)
- return deduped
- def format_wxindex_keywords(trend_json: dict[str, Any] | None) -> str:
- return ",".join(get_wxindex_keywords(trend_json))
- def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
- wxindex = trend_json.get("wxindex")
- if not isinstance(wxindex, dict):
- return ""
- return str(wxindex.get("trend") or "").strip()
- def _to_contribution_score(value: Any) -> float | None:
- try:
- if value is None:
- return None
- return float(value)
- except (TypeError, ValueError):
- return None
- def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]:
- match_rows = word_row.get("匹配需求列表") or []
- if not isinstance(match_rows, list):
- return []
- names: list[str] = []
- seen: set[str] = set()
- for match in match_rows:
- if not isinstance(match, dict):
- continue
- demand_name = str(match.get("demand_name") or "").strip()
- if not demand_name or demand_name in seen:
- continue
- seen.add(demand_name)
- names.append(demand_name)
- return names
- def extract_matched_demand_names(word_row: dict[str, Any]) -> str:
- return " ".join(extract_matched_demand_name_list(word_row))
- def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
- lookup: dict[str, dict[str, Any]] = {}
- for word_row in words_rows:
- if not isinstance(word_row, dict):
- continue
- word_text = str(word_row.get("词") or "").strip()
- if word_text:
- lookup[word_text] = word_row
- return lookup
- def _has_matched_demand_text(value: Any) -> bool:
- return bool(str(value or "").strip())
- def build_merged_word_lookup(
- match_result: dict[str, Any],
- contribution_points: dict[str, Any] | None,
- ) -> dict[str, dict[str, Any]]:
- """以 contribution 高贡献词为全集,匹配信息仅以 match_result 为准。"""
- match_lookup = build_word_lookup(match_result.get("高贡献词列表") or [])
- contribution_source = (
- contribution_points if isinstance(contribution_points, dict) else match_result
- )
- words_rows = contribution_source.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- lookup: dict[str, dict[str, Any]] = {}
- for word_row in words_rows:
- if not isinstance(word_row, dict):
- continue
- word_text = str(word_row.get("词") or "").strip()
- if not word_text:
- continue
- match_row = match_lookup.get(word_text)
- merged = {k: v for k, v in word_row.items() if k != "匹配需求列表"}
- if isinstance(match_row, dict):
- merged["匹配需求列表"] = list(match_row.get("匹配需求列表") or [])
- else:
- merged["匹配需求列表"] = []
- lookup[word_text] = merged
- for word_text, match_row in match_lookup.items():
- if word_text not in lookup:
- lookup[word_text] = match_row
- return lookup
- def enrich_word_lookup_from_points(
- word_lookup: dict[str, dict[str, Any]],
- *,
- points: list[Any],
- match_result: dict[str, Any],
- ) -> None:
- """把点列表里出现、但高贡献词列表未收录的词补进 lookup。"""
- if not isinstance(points, list):
- return
- for point_item in points:
- if not isinstance(point_item, dict):
- continue
- match_words = point_item.get("匹配词列表") or []
- if not isinstance(match_words, list):
- continue
- for hit in match_words:
- if not isinstance(hit, dict):
- continue
- word_text = str(hit.get("词") or "").strip()
- if not word_text or word_text in word_lookup:
- continue
- word_lookup[word_text] = _resolve_word_row(
- word_text,
- word_lookup=word_lookup,
- match_result=match_result,
- )
- def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
- word_categories: dict[str, set[str]] = {}
- if not isinstance(points, list):
- return word_categories
- for point_item in points:
- if not isinstance(point_item, dict):
- continue
- category = str(point_item.get("来源") or "").strip()
- if category not in POINT_CATEGORIES:
- continue
- match_words = point_item.get("匹配词列表") or []
- if not isinstance(match_words, list):
- continue
- for hit in match_words:
- if not isinstance(hit, dict):
- continue
- word_text = str(hit.get("词") or "").strip()
- if not word_text:
- continue
- word_categories.setdefault(word_text, set()).add(category)
- return word_categories
- def ordered_point_categories(categories: set[str]) -> list[str]:
- return [category for category in POINT_CATEGORIES if category in categories]
- def extract_point_matched_demand_names(
- point_item: dict[str, Any],
- word_lookup: dict[str, dict[str, Any]],
- ) -> str:
- match_words = point_item.get("匹配词列表") or []
- if not isinstance(match_words, list):
- return ""
- names: list[str] = []
- seen: set[str] = set()
- for hit in match_words:
- if not isinstance(hit, dict):
- continue
- word_text = str(hit.get("词") or "").strip()
- word_row = word_lookup.get(word_text)
- if not word_row:
- continue
- for demand_name in extract_matched_demand_name_list(word_row):
- if demand_name in seen:
- continue
- seen.add(demand_name)
- names.append(demand_name)
- return " ".join(names)
- def _build_word_export_row(
- word_text: str,
- word_row: dict[str, Any],
- category: str,
- ) -> dict[str, Any]:
- matched_demand = extract_matched_demand_names(word_row)
- return {
- "item_type": ITEM_TYPE_ELEMENT,
- "item_text": word_text,
- "point_category": category,
- "matched_demand": matched_demand,
- "contribution_score": (
- _to_contribution_score(word_row.get("贡献度"))
- if _has_matched_demand_text(matched_demand)
- else None
- ),
- }
- def _resolve_word_row(
- word_text: str,
- *,
- word_lookup: dict[str, dict[str, Any]],
- match_result: dict[str, Any],
- ) -> dict[str, Any]:
- word_row = word_lookup.get(word_text)
- if isinstance(word_row, dict):
- return word_row
- for row in match_result.get("匹配到需求的词列表") or []:
- if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
- return row
- for row in match_result.get("高贡献词列表") or []:
- if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
- return row
- return {"词": word_text}
- def append_wxindex_keyword_rows(
- export_rows: list[dict[str, Any]],
- *,
- trend_json: dict[str, Any] | None,
- match_result: dict[str, Any],
- word_lookup: dict[str, dict[str, Any]],
- word_to_categories: dict[str, set[str]],
- ) -> None:
- keywords = get_wxindex_keywords(trend_json)
- if not keywords:
- return
- existing_element_texts = {
- str(row.get("item_text") or "").strip()
- for row in export_rows
- if row.get("item_type") == ITEM_TYPE_ELEMENT
- }
- for keyword in keywords:
- if keyword in existing_element_texts:
- continue
- word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
- categories = ordered_point_categories(word_to_categories.get(keyword, set()))
- if categories:
- for category in categories:
- export_rows.append(_build_word_export_row(keyword, word_row, category))
- existing_element_texts.add(keyword)
- continue
- export_rows.append(_build_word_export_row(keyword, word_row, ""))
- existing_element_texts.add(keyword)
- def build_demand_export_rows(
- match_result: dict[str, Any],
- *,
- contribution_points: dict[str, Any] | None = None,
- trend_json: dict[str, Any] | None = None,
- ) -> list[dict[str, Any]]:
- export_rows: list[dict[str, Any]] = []
- contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
- points = contribution_source.get("点列表") or []
- if not isinstance(points, list):
- points = []
- word_lookup = build_merged_word_lookup(match_result, contribution_points)
- enrich_word_lookup_from_points(
- word_lookup,
- points=points,
- match_result=match_result,
- )
- word_to_categories = build_word_to_categories(points)
- for word_text in sorted(set(word_lookup) | set(word_to_categories)):
- categories = ordered_point_categories(word_to_categories.get(word_text, set()))
- if not categories:
- continue
- word_row = word_lookup.get(word_text) or _resolve_word_row(
- word_text,
- word_lookup=word_lookup,
- match_result=match_result,
- )
- for category in categories:
- export_rows.append(_build_word_export_row(word_text, word_row, category))
- for point_item in points:
- if not isinstance(point_item, dict):
- continue
- point_text = str(point_item.get("点") or "").strip()
- category = str(point_item.get("来源") or "").strip()
- if not point_text or category not in POINT_CATEGORIES:
- continue
- export_rows.append(
- {
- "item_type": ITEM_TYPE_PHRASE,
- "item_text": point_text,
- "point_category": category,
- "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
- "contribution_score": None,
- }
- )
- append_wxindex_keyword_rows(
- export_rows,
- trend_json=trend_json,
- match_result=match_result,
- word_lookup=word_lookup,
- word_to_categories=word_to_categories,
- )
- deduped_rows: list[dict[str, Any]] = []
- seen: set[tuple[str, str, str]] = set()
- for row in export_rows:
- key = (
- row["item_type"],
- row["item_text"],
- str(row.get("point_category") or ""),
- )
- if key in seen:
- continue
- seen.add(key)
- deduped_rows.append(row)
- return deduped_rows
- def attach_wxindex_metadata(
- export_rows: list[dict[str, Any]],
- trend_json: dict[str, Any] | None,
- ) -> list[dict[str, Any]]:
- latest_score = (
- get_latest_wxindex_score(trend_json)
- if isinstance(trend_json, dict)
- else None
- )
- trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
- wxindex_keyword = get_wxindex_keyword(trend_json)
- all_hot_keywords = format_wxindex_keywords(trend_json)
- rows: list[dict[str, Any]] = []
- for row in export_rows:
- matched_demand = str(row.get("matched_demand") or "").strip()
- has_matched_demand = _has_matched_demand_text(matched_demand)
- has_record_wxindex = latest_score is not None
- if has_record_wxindex and has_matched_demand:
- wxindex_score = float(latest_score)
- wxindex_trend_value = trend
- else:
- wxindex_score = 0.0
- wxindex_trend_value = ""
- normalized_row = dict(row)
- if not has_matched_demand:
- normalized_row["contribution_score"] = None
- rows.append(
- {
- **normalized_row,
- "matched_demand": matched_demand,
- "wxindex_keyword": wxindex_keyword,
- "all_hot_keywords": all_hot_keywords,
- "wxindex_latest_score": wxindex_score,
- "wxindex_trend": wxindex_trend_value,
- }
- )
- return rows
- def _json_loads(value: Any) -> Any:
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- return json.loads(value)
- return value
- def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]:
- limit_sql = "" if limit <= 0 else "LIMIT %s"
- params: tuple[Any, ...] = () if limit <= 0 else (limit,)
- cursor.execute(
- f"""
- SELECT
- id,
- source,
- title,
- article_title,
- contribution_points_json,
- contribution_demand_match_json,
- wxindex_trend_json
- FROM hot_content_records
- WHERE contribution_demand_match_json IS NOT NULL
- AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
- ORDER BY id ASC
- {limit_sql}
- """,
- params,
- )
- return list(cursor.fetchall())
- def export_existing_records(
- repository: HotContentRepository,
- records: list[dict[str, Any]],
- *,
- dry_run: bool,
- verbose: bool,
- ) -> dict[str, int]:
- summary = {
- "scanned": 0,
- "exported_records": 0,
- "exported_rows": 0,
- "no_export_rows": 0,
- "invalid_json": 0,
- "skipped": 0,
- }
- for row in records:
- summary["scanned"] += 1
- record_id = int(row["id"])
- try:
- match_json = _json_loads(row.get("contribution_demand_match_json"))
- contribution_points = _json_loads(row.get("contribution_points_json"))
- trend_json = _json_loads(row.get("wxindex_trend_json"))
- except json.JSONDecodeError:
- summary["invalid_json"] += 1
- if verbose:
- print(f"id={record_id}: JSON 解析失败,已跳过")
- continue
- if not isinstance(match_json, dict):
- summary["skipped"] += 1
- continue
- export_rows = attach_wxindex_metadata(
- build_demand_export_rows(
- match_json,
- contribution_points=(
- contribution_points if isinstance(contribution_points, dict) else None
- ),
- trend_json=trend_json if isinstance(trend_json, dict) else None,
- ),
- trend_json if isinstance(trend_json, dict) else None,
- )
- if not export_rows:
- summary["no_export_rows"] += 1
- if not dry_run:
- repository.replace_demand_export_rows(
- record_id=record_id,
- source=str(row.get("source") or ""),
- hot_title=str(row.get("title") or ""),
- article_title=str(row.get("article_title") or ""),
- rows=[],
- )
- continue
- if verbose or dry_run:
- matched_rows = sum(
- 1 for item in export_rows if str(item.get("matched_demand") or "").strip()
- )
- print(
- f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} "
- f"title={str(row.get('title') or '')[:40]}"
- )
- if not dry_run:
- repository.replace_demand_export_rows(
- record_id=record_id,
- source=str(row.get("source") or ""),
- hot_title=str(row.get("title") or ""),
- article_title=str(row.get("article_title") or ""),
- rows=export_rows,
- )
- summary["exported_records"] += 1
- summary["exported_rows"] += len(export_rows)
- return summary
- def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
- parser = argparse.ArgumentParser(
- description=(
- "扫描已有 contribution_demand_match_json 记录,"
- "全量导出元素/短语到 hot_content_demand_exports(含未匹配需求的项);"
- "元素/短语按灵感点/目的点/关键点展开为多行,"
- "并补充获取微信指数的词、微信指数及趋势。"
- ),
- )
- parser.add_argument(
- "--limit",
- type=int,
- default=0,
- help="最多处理多少条记录,默认 0 表示不限制。",
- )
- parser.add_argument(
- "--dry-run",
- action="store_true",
- help="只统计/打印,不写入数据库。",
- )
- parser.add_argument(
- "--verbose",
- action="store_true",
- help="打印每条成功导出的记录。",
- )
- return parser.parse_args(argv)
- def main(argv: list[str] | None = None) -> dict[str, int]:
- args = parse_args(argv)
- config = load_flow_config()
- repository = HotContentRepository(config.mysql)
- try:
- with repository.conn.cursor() as cursor:
- records = fetch_export_candidate_records(cursor, args.limit)
- summary = export_existing_records(
- repository,
- records,
- dry_run=args.dry_run,
- verbose=args.verbose,
- )
- finally:
- repository.close()
- action = "预览完成" if args.dry_run else "导出完成"
- print(f"{action}:{json.dumps(summary, ensure_ascii=False)}")
- return summary
- if __name__ == "__main__":
- main()
|