"""贡献点需求匹配与微信指数趋势后处理服务。""" from __future__ import annotations import json import re import time from datetime import datetime, timedelta from typing import Any from app.core.open_router_llm import OpenRouterCallError, create_chat_completion from app.hot_content.client import JsonApiClient from app.hot_content.config import load_flow_config from app.hot_content.demand_cache_service import DemandCacheService from app.hot_content.exceptions import HotContentFlowError from app.hot_content.repository import HotContentRepository from app.hot_content.status import PostprocessStatus from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import FlowConfig from app.hot_content.demand_export import ( attach_wxindex_metadata, build_demand_export_rows, build_export_rows_from_record, ) from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive from app.hot_content.demand_quality import run_demand_quality_pipeline from app.hot_content.wxindex_trend import calc_wxindex_trend from app.hot_content.wxindex_words import ( ensure_word_full_scores, slice_scores_lookback, sync_words_from_trend_json, ) class WxindexSelectionSkipped(Exception): """微信指数选词无效时跳过后续查询。""" def _extract_json_object(text: str) -> dict[str, Any]: raw = text.strip() if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```$", "", raw) try: parsed = json.loads(raw) if isinstance(parsed, dict): return parsed except json.JSONDecodeError: pass match = re.search(r"\{[\s\S]*\}", raw) if not match: raise HotContentFlowError("llm output is not json object") try: parsed = json.loads(match.group(0)) except json.JSONDecodeError as exc: raise HotContentFlowError(f"llm output invalid json: {exc}") from exc if not isinstance(parsed, dict): raise HotContentFlowError("llm output is not json object") return parsed def _get_recent_range(lookback_days: int) -> tuple[str, str]: today = datetime.now(SHANGHAI_TZ).date() end_date = today - timedelta(days=1) start_date = end_date - timedelta(days=max(lookback_days, 1)) return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d") def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]: rows = ((wx_resp.get("data") or {}).get("data") or []) if not isinstance(rows, list): return [] series: list[dict[str, Any]] = [] for row in rows: if not isinstance(row, dict): continue ymd = str(row.get("ymd") or "").strip() total_score = ((row.get("channel_score") or {}).get("total_score")) try: score_num = float(total_score) if total_score is not None else None except (TypeError, ValueError): score_num = None if ymd and score_num is not None: series.append({"ymd": ymd, "total_score": score_num}) series.sort(key=lambda x: x["ymd"]) return series def _normalize_demand_key(value: str) -> str: return "".join(value.split()) def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]: lookup: dict[str, str] = {} for item in demand_name_set: demand_name = str(item).strip() if not demand_name: continue lookup.setdefault(demand_name, demand_name) compact_key = _normalize_demand_key(demand_name) if compact_key: lookup.setdefault(compact_key, demand_name) return lookup def _resolve_demand_name( demand_name: str, demand_lookup: dict[str, str], ) -> str | None: value = demand_name.strip() if not value: return None return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value)) def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]: demand_names: list[str] = [] seen: set[str] = set() for row in matched_word_rows: if not isinstance(row, dict): continue match_rows = row.get("匹配需求列表") or [] if not isinstance(match_rows, list): continue for match in match_rows: if not isinstance(match, dict): continue demand_name = str(match.get("demand_name") or "").strip() if demand_name and demand_name not in seen: seen.add(demand_name) demand_names.append(demand_name) return demand_names class ContributionPostprocessService: def __init__( self, config: FlowConfig, repository: HotContentRepository, api_client: JsonApiClient, ): self.config = config self.repository = repository self.api_client = api_client def run(self) -> dict[str, Any]: records = self.repository.list_postprocess_candidates( limit=max(self.config.postprocess_batch_size, 1) ) if not records: return self._finalize_run_result( { "run_at": datetime.now(SHANGHAI_TZ).isoformat(), "status": "success", "candidate_count": 0, "matched_count": 0, "wxindex_count": 0, "skipped_count": 0, "failed_count": 0, } ) cache = DemandCacheService( self.config, self.repository, ).get_or_create_current_hour_cache() demand_cache_run_id = int(cache["id"]) demand_name_set = cache["demand_name_set"] if not demand_name_set: return self._finalize_run_result( { "run_at": datetime.now(SHANGHAI_TZ).isoformat(), "status": "skipped", "reason": "empty_demand_cache", "demand_cache_run_id": demand_cache_run_id, "cache_source": cache.get("source"), "processed_count": 0, } ) matched_count = 0 wxindex_count = 0 quality_count = 0 exported_count = 0 export_failed_count = 0 skipped_count = 0 failed_count = 0 for record in records: record_id = int(record["id"]) quality_saved = False match_result: dict[str, Any] | None = None try: match_result = record.get("contribution_demand_match_json") if ( not isinstance(match_result, dict) or int(record.get("demand_cache_run_id") or 0) != demand_cache_run_id ): match_result = self.match_record( record=record, demand_name_set=demand_name_set, demand_cache_run_id=demand_cache_run_id, ) self.repository.save_contribution_demand_match( record_id=record_id, demand_cache_run_id=demand_cache_run_id, match_json=match_result, ) matched_count += 1 sanitized_match_result = self.sanitize_match_result( match_result, demand_name_set=demand_name_set, demand_cache_run_id=demand_cache_run_id, ) if sanitized_match_result != match_result: match_result = sanitized_match_result self.repository.save_contribution_demand_match( record_id=record_id, demand_cache_run_id=demand_cache_run_id, match_json=match_result, ) postprocess_status = int(record.get("postprocess_status") or 0) existing_trend = record.get("wxindex_trend_json") if ( postprocess_status == PostprocessStatus.WXINDEX_DONE and isinstance(existing_trend, dict) ): trend_result = existing_trend else: trend_result = self.build_wxindex_trend(record, match_result) if trend_result is None: self.repository.update_postprocess_status( record_id=record_id, status=PostprocessStatus.SKIPPED, error_message="no matched demand words", ) self._save_empty_demand_quality(record_id=record_id) export_rows_count, export_error = self._export_demand_terms_if_needed( record=record, match_result=match_result, trend_result=None, ) if export_error: export_failed_count += 1 else: exported_count += export_rows_count skipped_count += 1 continue if postprocess_status != PostprocessStatus.WXINDEX_DONE: self.repository.save_wxindex_trend( record_id=record_id, trend_json=trend_result, ) self.sync_wxindex_words( record_id=record_id, trend_result=trend_result, event_created_at=record.get("created_at"), ) wxindex_count += 1 event_sense_json, senior_fit_json = self.run_demand_quality_judgment( record=record, match_result=match_result, trend_result=trend_result, ) self.repository.save_demand_quality( record_id=record_id, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) quality_saved = True quality_count += 1 export_rows_count, export_error = self._export_demand_terms_if_needed( record=record, match_result=match_result, trend_result=trend_result, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) if export_error: export_failed_count += 1 else: exported_count += export_rows_count except WxindexSelectionSkipped as exc: self.repository.update_postprocess_status( record_id=record_id, status=PostprocessStatus.SKIPPED, error_message=str(exc), ) if isinstance(match_result, dict): self._save_empty_demand_quality(record_id=record_id) export_rows_count, export_error = self._export_demand_terms_if_needed( record=record, match_result=match_result, trend_result=None, ) if export_error: export_failed_count += 1 else: exported_count += export_rows_count skipped_count += 1 except Exception as exc: if not quality_saved: self.repository.update_postprocess_status( record_id=record_id, status=PostprocessStatus.FAILED, error_message=str(exc), ) failed_count += 1 return self._finalize_run_result( { "run_at": datetime.now(SHANGHAI_TZ).isoformat(), "status": "success", "demand_cache_run_id": demand_cache_run_id, "cache_source": cache.get("source"), "cache_hour": str(cache.get("cache_hour") or ""), "demand_name_count": len(demand_name_set), "candidate_count": len(records), "matched_count": matched_count, "wxindex_count": wxindex_count, "quality_count": quality_count, "exported_count": exported_count, "export_failed_count": export_failed_count, "skipped_count": skipped_count, "failed_count": failed_count, } ) def _export_demand_terms_if_needed( self, *, record: dict[str, Any], match_result: dict[str, Any], trend_result: dict[str, Any] | None, event_sense_json: dict[str, Any] | None = None, senior_fit_json: dict[str, Any] | None = None, ) -> tuple[int, str | None]: """导出需求词,失败时返回错误信息且不改变 postprocess_status。""" try: exported_rows = self.export_demand_terms_if_needed( record=record, match_result=match_result, trend_result=trend_result, event_sense_json=event_sense_json, senior_fit_json=senior_fit_json, ) except Exception as exc: return 0, str(exc) return exported_rows, None def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]: try: result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository) except Exception as exc: result["hive_sync_error"] = str(exc) if result.get("hive_sync_error"): if result.get("status") == "success": result["status"] = "partial_failure" if int(result.get("export_failed_count") or 0) > 0 and result.get("status") == "success": result["status"] = "partial_failure" return result def sync_wxindex_words( self, *, record_id: int, trend_result: dict[str, Any], event_created_at: datetime | None = None, verbose: bool = False, ) -> dict[str, int]: return sync_words_from_trend_json( self.repository, self.api_client, self.config.wxindex_api_url, trend_json=trend_result, record_id=record_id, event_created_at=event_created_at, verbose=verbose, update_meta_if_exists=True, ) def _save_empty_demand_quality(self, *, record_id: int) -> None: self.repository.save_demand_quality( record_id=record_id, event_sense_json={}, senior_fit_json={}, update_status=False, ) def run_demand_quality_judgment( self, *, record: dict[str, Any], match_result: dict[str, Any], trend_result: dict[str, Any], ) -> tuple[dict[str, Any], dict[str, Any]]: channel_content_id = str( match_result.get("channelContentId") or record.get("unique_key") or "" ).strip() base_export_rows = attach_wxindex_metadata( build_demand_export_rows( match_result, contribution_points=( record.get("contribution_points_json") if isinstance(record.get("contribution_points_json"), dict) else None ), trend_json=trend_result, ), trend_result, wxindex_threshold=self.config.wxindex_score_threshold, ) return run_demand_quality_pipeline( channel_content_id=channel_content_id, export_rows=base_export_rows, wxindex_threshold=self.config.wxindex_score_threshold, event_threshold=self.config.demand_event_sense_threshold, senior_threshold=self.config.demand_senior_fit_threshold, model=self.config.demand_quality_llm_model, max_attempts=self.config.demand_quality_llm_max_attempts, retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds, max_tokens=self.config.demand_quality_llm_max_tokens, ) def export_demand_terms_if_needed( self, *, record: dict[str, Any], match_result: dict[str, Any], trend_result: dict[str, Any] | None, event_sense_json: dict[str, Any] | None = None, senior_fit_json: dict[str, Any] | None = None, ) -> int: normalized_record = { "contribution_demand_match_json": match_result, "contribution_points_json": ( record.get("contribution_points_json") if isinstance(record.get("contribution_points_json"), dict) else None ), "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None, "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {}, "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {}, } export_rows = build_export_rows_from_record( normalized_record, wxindex_threshold=self.config.wxindex_score_threshold, event_sense_json=normalized_record["demand_event_sense_json"], senior_fit_json=normalized_record["demand_senior_fit_json"], event_threshold=self.config.demand_event_sense_threshold, senior_threshold=self.config.demand_senior_fit_threshold, ) self.repository.replace_demand_export_rows( record_id=int(record["id"]), source=str(record.get("source") or ""), hot_title=str(record.get("title") or ""), article_title=str(record.get("article_title") or ""), rows=export_rows, ) return len(export_rows) def match_record( self, *, record: dict[str, Any], demand_name_set: list[str], demand_cache_run_id: int, ) -> dict[str, Any]: contribution_points = record.get("contribution_points_json") if not isinstance(contribution_points, dict): raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}") channel_content_id = str( contribution_points.get("channelContentId") or record.get("unique_key") or "" ).strip() words_rows = contribution_points.get("高贡献词列表") or [] if not isinstance(words_rows, list): words_rows = [] word_list = [ str(item.get("词") or "").strip() for item in words_rows if isinstance(item, dict) and str(item.get("词") or "").strip() ] llm_result = self.llm_match_single_article( channel_content_id=channel_content_id or str(record["unique_key"]), words=word_list, demand_name_set=demand_name_set, ) demand_lookup = _build_demand_lookup(demand_name_set) matched_map: dict[str, list[dict[str, str]]] = {} for item in llm_result.get("matched") or []: if not isinstance(item, dict): continue word = str(item.get("title") or item.get("word") or item.get("词") or "").strip() demand_name = str(item.get("demand_name") or "").strip() reason = str(item.get("reason") or "").strip() if not word or word not in word_list: continue canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup) if canonical_demand_name is None: continue matched_map.setdefault(word, []).append( { "demand_name": canonical_demand_name, "reason": reason, } ) output_words: list[dict[str, Any]] = [] matched_word_rows: list[dict[str, Any]] = [] for row in words_rows: if not isinstance(row, dict): continue word = str(row.get("词") or "").strip() item = {"词": word, "贡献度": row.get("贡献度")} if word in matched_map: item["匹配需求列表"] = matched_map[word] matched_word_rows.append(item) output_words.append(item) matched_points = self.filter_matched_points( contribution_points.get("点列表"), set(matched_map.keys()), ) return { "channelContentId": channel_content_id, "demand_cache_run_id": demand_cache_run_id, "高贡献词列表": output_words, "匹配到需求的词列表": matched_word_rows, "点列表": matched_points, } def sanitize_match_result( self, match_result: dict[str, Any], *, demand_name_set: list[str], demand_cache_run_id: int, ) -> dict[str, Any]: demand_lookup = _build_demand_lookup(demand_name_set) words_rows = match_result.get("高贡献词列表") or [] if not isinstance(words_rows, list): words_rows = [] output_words: list[dict[str, Any]] = [] matched_word_rows: list[dict[str, Any]] = [] valid_words: set[str] = set() for row in words_rows: if not isinstance(row, dict): continue word = str(row.get("词") or "").strip() item = {"词": word, "贡献度": row.get("贡献度")} match_rows = row.get("匹配需求列表") or [] if not isinstance(match_rows, list): match_rows = [] valid_match_rows: list[dict[str, str]] = [] for match in match_rows: if not isinstance(match, dict): continue demand_name = str(match.get("demand_name") or "").strip() canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup) if canonical_demand_name is None: continue valid_match_rows.append( { "demand_name": canonical_demand_name, "reason": str(match.get("reason") or "").strip(), } ) if word and valid_match_rows: item["匹配需求列表"] = valid_match_rows matched_word_rows.append(item) valid_words.add(word) output_words.append(item) return { "channelContentId": str(match_result.get("channelContentId") or ""), "demand_cache_run_id": demand_cache_run_id, "高贡献词列表": output_words, "匹配到需求的词列表": matched_word_rows, "点列表": self.filter_matched_points( match_result.get("点列表"), valid_words, ), } def llm_match_single_article( self, *, channel_content_id: str, words: list[str], demand_name_set: list[str], ) -> dict[str, Any]: if not words: return {"source": channel_content_id, "matched": []} system_prompt = """ 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。 # 任务 我会提供两组数据: 1. 热点词列表:一组待匹配的热点词语 2. 需求词库:一组已有的需求词语 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。 # 匹配标准 满足以下任意一条,则视为匹配成功: - 热点词与需求词含义相同或高度相近(如同义词、近义词) - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴) - 热点词与需求词在用户意图上高度一致 以下情况,不得视为匹配: - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义 - 热点词与需求词只有表面字符重叠,语义方向不同 - 热点词是需求词的上位概念(范围过宽,含义不够精确) - 两者只是同属某个大类,但具体含义差异明显 # 多词组成的需求词处理规则 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。 # 输出规则 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。 # 约束 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。 """ user_payload = { "source": channel_content_id, "words": words, "demand_name_set": demand_name_set, "output_schema": { "source": "string", "matched": [ { "title": "string, must be selected from words", "demand_name": "string, must be selected from demand_name_set", "reason": "string", } ], }, } last_error: Exception | None = None for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1): try: resp = create_chat_completion( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": json.dumps(user_payload, ensure_ascii=False), }, ], model=self.config.contribution_match_llm_model or None, temperature=0, max_tokens=max(self.config.contribution_match_llm_max_tokens, 1), ) parsed = _extract_json_object(str(resp.get("content") or "")) parsed.setdefault("source", channel_content_id) parsed.setdefault("matched", []) return parsed except (OpenRouterCallError, HotContentFlowError) as exc: last_error = exc if attempt < max(self.config.contribution_match_llm_max_attempts, 1): time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0)) raise HotContentFlowError( f"llm match failed for channelContentId={channel_content_id}: {last_error}" ) from last_error @staticmethod def _build_word_demand_match_result( *, word: str, llm_result: dict[str, Any], demand_lookup: dict[str, str], ) -> dict[str, Any]: target_word = str(word or "").strip() match_list: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() for item in llm_result.get("matched") or []: if not isinstance(item, dict): continue matched_word = str( item.get("title") or item.get("word") or item.get("词") or "" ).strip() demand_name = str(item.get("demand_name") or "").strip() reason = str(item.get("reason") or "").strip() if matched_word != target_word: continue canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup) if canonical_demand_name is None: continue dedupe_key = (canonical_demand_name, reason) if dedupe_key in seen: continue seen.add(dedupe_key) match_list.append( { "demand_name": canonical_demand_name, "reason": reason, } ) matched_demand_names: list[str] = [] matched_seen: set[str] = set() for item in match_list: demand_name = str(item.get("demand_name") or "").strip() if demand_name and demand_name not in matched_seen: matched_seen.add(demand_name) matched_demand_names.append(demand_name) return { "word": target_word, "matched": bool(match_list), "matched_demand": " ".join(matched_demand_names), "match_list": match_list, } def match_words_to_demand_pool( self, *, words: list[str], demand_name_set: list[str], source_id: str | None = None, ) -> dict[str, dict[str, Any]]: """批量热词与票圈内部需求池匹配,返回 word -> match_result。""" cleaned: list[str] = [] seen: set[str] = set() for raw in words: word = str(raw or "").strip() if not word or word in seen: continue seen.add(word) cleaned.append(word) if not cleaned: return {} llm_result = self.llm_match_single_article( channel_content_id=source_id or f"wxindex_words:{','.join(cleaned[:3])}", words=cleaned, demand_name_set=demand_name_set, ) demand_lookup = _build_demand_lookup(demand_name_set) return { word: self._build_word_demand_match_result( word=word, llm_result=llm_result, demand_lookup=demand_lookup, ) for word in cleaned } def match_single_word_to_demand_pool( self, *, word: str, demand_name_set: list[str], source_id: str | None = None, ) -> dict[str, Any]: """单热词与票圈内部需求池匹配(复用主流程 LLM 规则)。""" target_word = str(word or "").strip() if not target_word: return { "word": "", "matched": False, "matched_demand": "", "match_list": [], } batch_result = self.match_words_to_demand_pool( words=[target_word], demand_name_set=demand_name_set, source_id=source_id, ) return batch_result.get( target_word, { "word": target_word, "matched": False, "matched_demand": "", "match_list": [], }, ) def build_wxindex_trend( self, record: dict[str, Any], match_result: dict[str, Any], ) -> dict[str, Any] | None: matched_word_rows = match_result.get("匹配到需求的词列表") or [] if not isinstance(matched_word_rows, list) or not matched_word_rows: return None contribution_words = [ str(row.get("词") or "").strip() for row in matched_word_rows if isinstance(row, dict) and str(row.get("词") or "").strip() ] if not contribution_words: return None channel_content_id = str( match_result.get("channelContentId") or record.get("unique_key") or "" ) article_title, body_text = self.extract_article_text(record) matched_demands = _collect_matched_demand_names(matched_word_rows) pick = self.llm_extract_wxindex_words( channel_content_id=channel_content_id, article_title=article_title, body_text=body_text, contribution_words=contribution_words, matched_demands=matched_demands, ) selected_words = pick["selected_words"] threshold = float(self.config.wxindex_score_threshold) wxindex_searches: list[dict[str, Any]] = [] event_created_at = record.get("created_at") for keyword in selected_words: full_scores, _action = ensure_word_full_scores( self.repository, self.api_client, self.config.wxindex_api_url, keyword=keyword, event_created_at=event_created_at, update_meta_if_exists=True, ) series, start_ymd, end_ymd = slice_scores_lookback( full_scores, self.config.wxindex_lookback_days, ) latest_score = series[-1]["total_score"] if series else None wxindex_searches.append( { "keyword": keyword, "start_ymd": start_ymd, "end_ymd": end_ymd, "total_score_7d": series, "latest_total_score": latest_score, "threshold": threshold, "latest_gt_threshold": ( False if latest_score is None else latest_score >= threshold ), "trend": calc_wxindex_trend(series), } ) searchable = [ item for item in wxindex_searches if item.get("latest_total_score") is not None ] if not searchable: raise WxindexSelectionSkipped( f"no wxindex score for any keyword in {channel_content_id}: " f"{selected_words}" ) best = max(searchable, key=lambda item: float(item["latest_total_score"])) selected_word = str(best["keyword"]) latest_score = best["latest_total_score"] series = best["total_score_7d"] return { "channelContentId": channel_content_id, "article_title": article_title, "llm_selected_words": selected_words, "llm_selected_word": selected_word, "llm_reason": pick["reason"], "wxindex_searches": wxindex_searches, "wxindex": { "keyword": selected_word, "keywords": selected_words, "start_ymd": best["start_ymd"], "end_ymd": best["end_ymd"], "total_score_7d": series, "latest_total_score": latest_score, "threshold": threshold, "latest_gt_threshold": latest_score >= threshold, "trend": best["trend"], }, } def llm_extract_wxindex_words( self, *, channel_content_id: str, article_title: str, body_text: str, contribution_words: list[str], matched_demands: list[str], ) -> dict[str, Any]: system_prompt = """ #角色 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。 # 任务 我会提供文章标题、正文,以及两类备选词来源: 1. 高贡献词:文章贡献度较高的关键词 2. 已匹配需求:已与需求库匹配上的需求名 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。 # 输出规则 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。 """ user_payload = { "source": channel_content_id, "article_title": article_title, "article_body_text": body_text, "contribution_words": contribution_words, "matched_demands": matched_demands, "output_schema": { "source": "string", "selected_words": [ "string, concise keyword for wxindex search, one or more" ], "reason": "string", }, "constraints": [ "selected_words 为数组,至少 1 个词,可多个", "每个词简洁(2-4 字),适合微信指数检索", "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬", "多个词应分别覆盖不同事件或角度,避免语义重复", "reason 简洁说明,不超过60字", "仅输出 JSON 对象,不要 markdown 代码块", ], } last_error: Exception | None = None for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1): try: resp = create_chat_completion( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": json.dumps(user_payload, ensure_ascii=False), }, ], model=self.config.wxindex_llm_model or None, temperature=0, max_tokens=max(self.config.wxindex_llm_max_tokens, 1), ) parsed = _extract_json_object(str(resp.get("content") or "")) raw_words = parsed.get("selected_words") if isinstance(raw_words, str): raw_words = [raw_words] if not isinstance(raw_words, list): legacy_word = str(parsed.get("selected_word") or "").strip() raw_words = [legacy_word] if legacy_word else [] selected_words: list[str] = [] seen: set[str] = set() for item in raw_words: word = str(item or "").strip() if word and word not in seen: seen.add(word) selected_words.append(word) reason = str(parsed.get("reason") or "").strip() if not selected_words: raise WxindexSelectionSkipped( f"selected_words empty for {channel_content_id}" ) return {"selected_words": selected_words, "reason": reason} except WxindexSelectionSkipped: raise except (OpenRouterCallError, HotContentFlowError) as exc: last_error = exc if attempt < max(self.config.wxindex_llm_max_attempts, 1): continue raise HotContentFlowError( f"llm extract wxindex words failed for {channel_content_id}: {last_error}" ) from last_error @staticmethod def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]: if not matched_words or not isinstance(raw_points, list): return [] matched_points: list[dict[str, Any]] = [] for point in raw_points: if not isinstance(point, dict): continue point_match_rows = point.get("匹配词列表") or [] if not isinstance(point_match_rows, list): point_match_rows = [] keep_rows = [ row for row in point_match_rows if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words ] if keep_rows: matched_points.append( { "来源": str(point.get("来源") or ""), "点": str(point.get("点") or ""), "点描述": str(point.get("点描述") or ""), "匹配词列表": keep_rows, } ) return matched_points @staticmethod def extract_article_text(record: dict[str, Any]) -> tuple[str, str]: decode_result = record.get("decode_result_json") target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {} if not isinstance(target_post, dict): target_post = {} article_title = str( target_post.get("title") or record.get("article_title") or "" ).strip() body_text = str( target_post.get("body_text") or record.get("article_body") or "" ).strip() return article_title, body_text def run_once(config: FlowConfig | None = None) -> dict[str, Any]: flow_config = config or load_flow_config() repository = HotContentRepository(flow_config.mysql) try: api_client = JsonApiClient( timeout_seconds=flow_config.request_timeout_seconds, verify_ssl=flow_config.https_verify_ssl, ) service = ContributionPostprocessService(flow_config, repository, api_client) return service.run() finally: repository.close()