"""需求元素/短语导出到 MySQL 的逻辑(全量,不做 ODPS 过滤)。 MySQL hot_content_demand_exports 写入本模块产出的全量行: - 元素:点列表关联的全部高贡献词(含未匹配需求) - 短语:灵感点/目的点/关键点全部短语(含未匹配需求) ODPS 写入前的过滤(微信指数门槛、matched_demand 等)在 demand_hive_export 中处理。 """ from __future__ import annotations import argparse import json from typing import Any from app.hot_content.config import load_flow_config from app.hot_content.repository import HotContentRepository WXINDEX_EXPORT_THRESHOLD = 1_000_000.0 # 与 WXINDEX_SCORE_THRESHOLD 默认值一致 POINT_CATEGORIES = ("灵感点", "目的点", "关键点") ITEM_TYPE_ELEMENT = "元素" ITEM_TYPE_PHRASE = "短语" def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None: wxindex = trend_json.get("wxindex") if not isinstance(wxindex, dict): return None try: return float(wxindex.get("latest_total_score")) except (TypeError, ValueError): return None def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str: if not isinstance(trend_json, dict): return "" wxindex = trend_json.get("wxindex") if isinstance(wxindex, dict): keyword = str(wxindex.get("keyword") or "").strip() if keyword: return keyword return str(trend_json.get("llm_selected_word") or "").strip() def get_wxindex_keywords(trend_json: dict[str, Any] | None) -> list[str]: if not isinstance(trend_json, dict): return [] raw_words: list[Any] = [] wxindex = trend_json.get("wxindex") if isinstance(wxindex, dict): keywords = wxindex.get("keywords") if isinstance(keywords, list): raw_words = keywords elif keywords: raw_words = [keywords] if not raw_words: selected_words = trend_json.get("llm_selected_words") if isinstance(selected_words, list): raw_words = selected_words elif selected_words: raw_words = [selected_words] if not raw_words: keyword = get_wxindex_keyword(trend_json) return [keyword] if keyword else [] deduped: list[str] = [] seen: set[str] = set() for item in raw_words: word = str(item or "").strip() if word and word not in seen: seen.add(word) deduped.append(word) return deduped def format_wxindex_keywords(trend_json: dict[str, Any] | None) -> str: return ",".join(get_wxindex_keywords(trend_json)) def get_wxindex_trend(trend_json: dict[str, Any]) -> str: wxindex = trend_json.get("wxindex") if not isinstance(wxindex, dict): return "" return str(wxindex.get("trend") or "").strip() def _to_contribution_score(value: Any) -> float | None: try: if value is None: return None return float(value) except (TypeError, ValueError): return None def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]: match_rows = word_row.get("匹配需求列表") or [] if not isinstance(match_rows, list): return [] names: list[str] = [] seen: set[str] = set() for match in match_rows: if not isinstance(match, dict): continue demand_name = str(match.get("demand_name") or "").strip() if not demand_name or demand_name in seen: continue seen.add(demand_name) names.append(demand_name) return names def extract_matched_demand_names(word_row: dict[str, Any]) -> str: return " ".join(extract_matched_demand_name_list(word_row)) def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]: lookup: dict[str, dict[str, Any]] = {} for word_row in words_rows: if not isinstance(word_row, dict): continue word_text = str(word_row.get("词") or "").strip() if word_text: lookup[word_text] = word_row return lookup def _has_matched_demand_text(value: Any) -> bool: return bool(str(value or "").strip()) def build_merged_word_lookup( match_result: dict[str, Any], contribution_points: dict[str, Any] | None, ) -> dict[str, dict[str, Any]]: """以 contribution 高贡献词为全集,匹配信息仅以 match_result 为准。""" match_lookup = build_word_lookup(match_result.get("高贡献词列表") or []) contribution_source = ( contribution_points if isinstance(contribution_points, dict) else match_result ) words_rows = contribution_source.get("高贡献词列表") or [] if not isinstance(words_rows, list): words_rows = [] lookup: dict[str, dict[str, Any]] = {} for word_row in words_rows: if not isinstance(word_row, dict): continue word_text = str(word_row.get("词") or "").strip() if not word_text: continue match_row = match_lookup.get(word_text) merged = {k: v for k, v in word_row.items() if k != "匹配需求列表"} if isinstance(match_row, dict): merged["匹配需求列表"] = list(match_row.get("匹配需求列表") or []) else: merged["匹配需求列表"] = [] lookup[word_text] = merged for word_text, match_row in match_lookup.items(): if word_text not in lookup: lookup[word_text] = match_row return lookup def enrich_word_lookup_from_points( word_lookup: dict[str, dict[str, Any]], *, points: list[Any], match_result: dict[str, Any], ) -> None: """把点列表里出现、但高贡献词列表未收录的词补进 lookup。""" if not isinstance(points, list): return for point_item in points: if not isinstance(point_item, dict): continue match_words = point_item.get("匹配词列表") or [] if not isinstance(match_words, list): continue for hit in match_words: if not isinstance(hit, dict): continue word_text = str(hit.get("词") or "").strip() if not word_text or word_text in word_lookup: continue word_lookup[word_text] = _resolve_word_row( word_text, word_lookup=word_lookup, match_result=match_result, ) def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]: word_categories: dict[str, set[str]] = {} if not isinstance(points, list): return word_categories for point_item in points: if not isinstance(point_item, dict): continue category = str(point_item.get("来源") or "").strip() if category not in POINT_CATEGORIES: continue match_words = point_item.get("匹配词列表") or [] if not isinstance(match_words, list): continue for hit in match_words: if not isinstance(hit, dict): continue word_text = str(hit.get("词") or "").strip() if not word_text: continue word_categories.setdefault(word_text, set()).add(category) return word_categories def ordered_point_categories(categories: set[str]) -> list[str]: return [category for category in POINT_CATEGORIES if category in categories] def extract_point_matched_demand_names( point_item: dict[str, Any], word_lookup: dict[str, dict[str, Any]], ) -> str: match_words = point_item.get("匹配词列表") or [] if not isinstance(match_words, list): return "" names: list[str] = [] seen: set[str] = set() for hit in match_words: if not isinstance(hit, dict): continue word_text = str(hit.get("词") or "").strip() word_row = word_lookup.get(word_text) if not word_row: continue for demand_name in extract_matched_demand_name_list(word_row): if demand_name in seen: continue seen.add(demand_name) names.append(demand_name) return " ".join(names) def _build_word_export_row( word_text: str, word_row: dict[str, Any], category: str, ) -> dict[str, Any]: matched_demand = extract_matched_demand_names(word_row) return { "item_type": ITEM_TYPE_ELEMENT, "item_text": word_text, "point_category": category, "matched_demand": matched_demand, "contribution_score": ( _to_contribution_score(word_row.get("贡献度")) if _has_matched_demand_text(matched_demand) else None ), } def _resolve_word_row( word_text: str, *, word_lookup: dict[str, dict[str, Any]], match_result: dict[str, Any], ) -> dict[str, Any]: word_row = word_lookup.get(word_text) if isinstance(word_row, dict): return word_row for row in match_result.get("匹配到需求的词列表") or []: if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text: return row for row in match_result.get("高贡献词列表") or []: if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text: return row return {"词": word_text} def append_wxindex_keyword_rows( export_rows: list[dict[str, Any]], *, trend_json: dict[str, Any] | None, match_result: dict[str, Any], word_lookup: dict[str, dict[str, Any]], word_to_categories: dict[str, set[str]], ) -> None: keywords = get_wxindex_keywords(trend_json) if not keywords: return existing_element_texts = { str(row.get("item_text") or "").strip() for row in export_rows if row.get("item_type") == ITEM_TYPE_ELEMENT } for keyword in keywords: if keyword in existing_element_texts: continue word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result) categories = ordered_point_categories(word_to_categories.get(keyword, set())) if categories: for category in categories: export_rows.append(_build_word_export_row(keyword, word_row, category)) existing_element_texts.add(keyword) continue export_rows.append(_build_word_export_row(keyword, word_row, "")) existing_element_texts.add(keyword) def build_demand_export_rows( match_result: dict[str, Any], *, contribution_points: dict[str, Any] | None = None, trend_json: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: export_rows: list[dict[str, Any]] = [] contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result points = contribution_source.get("点列表") or [] if not isinstance(points, list): points = [] word_lookup = build_merged_word_lookup(match_result, contribution_points) enrich_word_lookup_from_points( word_lookup, points=points, match_result=match_result, ) word_to_categories = build_word_to_categories(points) for word_text in sorted(set(word_lookup) | set(word_to_categories)): categories = ordered_point_categories(word_to_categories.get(word_text, set())) if not categories: continue word_row = word_lookup.get(word_text) or _resolve_word_row( word_text, word_lookup=word_lookup, match_result=match_result, ) for category in categories: export_rows.append(_build_word_export_row(word_text, word_row, category)) for point_item in points: if not isinstance(point_item, dict): continue point_text = str(point_item.get("点") or "").strip() category = str(point_item.get("来源") or "").strip() if not point_text or category not in POINT_CATEGORIES: continue export_rows.append( { "item_type": ITEM_TYPE_PHRASE, "item_text": point_text, "point_category": category, "matched_demand": extract_point_matched_demand_names(point_item, word_lookup), "contribution_score": None, } ) append_wxindex_keyword_rows( export_rows, trend_json=trend_json, match_result=match_result, word_lookup=word_lookup, word_to_categories=word_to_categories, ) deduped_rows: list[dict[str, Any]] = [] seen: set[tuple[str, str, str]] = set() for row in export_rows: key = ( row["item_type"], row["item_text"], str(row.get("point_category") or ""), ) if key in seen: continue seen.add(key) deduped_rows.append(row) return deduped_rows def attach_wxindex_metadata( export_rows: list[dict[str, Any]], trend_json: dict[str, Any] | None, *, wxindex_threshold: float = WXINDEX_EXPORT_THRESHOLD, event_sense_json: dict[str, Any] | None = None, senior_fit_json: dict[str, Any] | None = None, event_threshold: float = 0.0, senior_threshold: float = 0.0, ) -> list[dict[str, Any]]: from app.hot_content.demand_hive_export import is_export_row_as_demand latest_score = ( get_latest_wxindex_score(trend_json) if isinstance(trend_json, dict) else None ) trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else "" wxindex_keyword = get_wxindex_keyword(trend_json) all_hot_keywords = format_wxindex_keywords(trend_json) has_record_wxindex = latest_score is not None record_wxindex_score = float(latest_score or 0) # is_as_demand 与 ODPS 规则一致,需用落库后的 wxindex_latest_score 做标题门槛判断 gate_rows: list[dict[str, Any]] = [] for row in export_rows: matched = str(row.get("matched_demand") or "").strip() gate_row = dict(row) if has_record_wxindex and _has_matched_demand_text(matched): gate_row["wxindex_latest_score"] = record_wxindex_score else: gate_row["wxindex_latest_score"] = 0.0 gate_rows.append(gate_row) rows: list[dict[str, Any]] = [] for row in export_rows: matched_demand = str(row.get("matched_demand") or "").strip() has_matched_demand = _has_matched_demand_text(matched_demand) if has_record_wxindex and has_matched_demand: wxindex_score = float(latest_score) wxindex_trend_value = trend else: wxindex_score = 0.0 wxindex_trend_value = "" normalized_row = dict(row) if not has_matched_demand: normalized_row["contribution_score"] = None rows.append( { **normalized_row, "matched_demand": matched_demand, "wxindex_keyword": wxindex_keyword, "all_hot_keywords": all_hot_keywords, "wxindex_latest_score": wxindex_score, "wxindex_trend": wxindex_trend_value, "is_as_demand": is_export_row_as_demand( normalized_row, gate_rows, wxindex_threshold=wxindex_threshold, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ), } ) return rows def build_export_rows_from_record( record: dict[str, Any], *, wxindex_threshold: float, event_sense_json: dict[str, Any] | None = None, senior_fit_json: dict[str, Any] | None = None, event_threshold: float = 0.0, senior_threshold: float = 0.0, ) -> list[dict[str, Any]]: from app.hot_content.demand_quality import attach_quality_scores_to_export_rows match_json = record.get("contribution_demand_match_json") if not isinstance(match_json, dict): return [] contribution_points = record.get("contribution_points_json") trend_json = record.get("wxindex_trend_json") export_rows = attach_wxindex_metadata( build_demand_export_rows( match_json, contribution_points=( contribution_points if isinstance(contribution_points, dict) else None ), trend_json=trend_json if isinstance(trend_json, dict) else None, ), trend_json if isinstance(trend_json, dict) else None, wxindex_threshold=wxindex_threshold, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, event_threshold=event_threshold, senior_threshold=senior_threshold, ) return attach_quality_scores_to_export_rows( export_rows, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) def _json_loads(value: Any) -> Any: if value is None: return None if isinstance(value, (dict, list)): return value if isinstance(value, (bytes, bytearray)): value = value.decode("utf-8") if isinstance(value, str): return json.loads(value) return value def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]: limit_sql = "" if limit <= 0 else "LIMIT %s" params: tuple[Any, ...] = () if limit <= 0 else (limit,) cursor.execute( f""" SELECT id, source, title, article_title, contribution_points_json, contribution_demand_match_json, wxindex_trend_json, demand_event_sense_json, demand_senior_fit_json FROM hot_content_records WHERE contribution_demand_match_json IS NOT NULL AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> '' ORDER BY id ASC {limit_sql} """, params, ) return list(cursor.fetchall()) def export_existing_records( repository: HotContentRepository, records: list[dict[str, Any]], *, dry_run: bool, verbose: bool, wxindex_threshold: float = WXINDEX_EXPORT_THRESHOLD, event_threshold: float = 0.0, senior_threshold: float = 0.0, ) -> dict[str, int]: summary = { "scanned": 0, "exported_records": 0, "exported_rows": 0, "no_export_rows": 0, "invalid_json": 0, "skipped": 0, } for row in records: summary["scanned"] += 1 record_id = int(row["id"]) try: match_json = _json_loads(row.get("contribution_demand_match_json")) contribution_points = _json_loads(row.get("contribution_points_json")) trend_json = _json_loads(row.get("wxindex_trend_json")) event_sense_json = _json_loads(row.get("demand_event_sense_json")) senior_fit_json = _json_loads(row.get("demand_senior_fit_json")) except json.JSONDecodeError: summary["invalid_json"] += 1 if verbose: print(f"id={record_id}: JSON 解析失败,已跳过") continue if not isinstance(match_json, dict): summary["skipped"] += 1 continue normalized_record = { "contribution_demand_match_json": match_json, "contribution_points_json": contribution_points, "wxindex_trend_json": trend_json, "demand_event_sense_json": ( event_sense_json if isinstance(event_sense_json, dict) else {} ), "demand_senior_fit_json": ( senior_fit_json if isinstance(senior_fit_json, dict) else {} ), } export_rows = build_export_rows_from_record( normalized_record, wxindex_threshold=wxindex_threshold, event_sense_json=normalized_record["demand_event_sense_json"], senior_fit_json=normalized_record["demand_senior_fit_json"], event_threshold=event_threshold, senior_threshold=senior_threshold, ) if not export_rows: summary["no_export_rows"] += 1 if not dry_run: repository.replace_demand_export_rows( record_id=record_id, source=str(row.get("source") or ""), hot_title=str(row.get("title") or ""), article_title=str(row.get("article_title") or ""), rows=[], ) continue if verbose or dry_run: matched_rows = sum( 1 for item in export_rows if str(item.get("matched_demand") or "").strip() ) print( f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} " f"title={str(row.get('title') or '')[:40]}" ) if not dry_run: repository.replace_demand_export_rows( record_id=record_id, source=str(row.get("source") or ""), hot_title=str(row.get("title") or ""), article_title=str(row.get("article_title") or ""), rows=export_rows, ) summary["exported_records"] += 1 summary["exported_rows"] += len(export_rows) return summary def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "扫描已有 contribution_demand_match_json 记录," "全量导出元素/短语到 hot_content_demand_exports(含未匹配需求的项);" "元素/短语按灵感点/目的点/关键点展开为多行," "并补充获取微信指数的词、微信指数及趋势。" ), ) parser.add_argument( "--limit", type=int, default=0, help="最多处理多少条记录,默认 0 表示不限制。", ) parser.add_argument( "--dry-run", action="store_true", help="只统计/打印,不写入数据库。", ) parser.add_argument( "--verbose", action="store_true", help="打印每条成功导出的记录。", ) return parser.parse_args(argv) def main(argv: list[str] | None = None) -> dict[str, int]: args = parse_args(argv) config = load_flow_config() repository = HotContentRepository(config.mysql) try: with repository.conn.cursor() as cursor: records = fetch_export_candidate_records(cursor, args.limit) summary = export_existing_records( repository, records, dry_run=args.dry_run, verbose=args.verbose, wxindex_threshold=config.wxindex_score_threshold, event_threshold=config.demand_event_sense_threshold, senior_threshold=config.demand_senior_fit_threshold, ) finally: repository.close() action = "预览完成" if args.dry_run else "导出完成" print(f"{action}:{json.dumps(summary, ensure_ascii=False)}") return summary if __name__ == "__main__": main()