| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037 |
- """贡献点需求匹配与微信指数趋势后处理服务。"""
- from __future__ import annotations
- import json
- import re
- import time
- from datetime import datetime, timedelta
- from typing import Any
- from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
- from app.hot_content.client import JsonApiClient
- from app.hot_content.config import load_flow_config
- from app.hot_content.demand_cache_service import DemandCacheService
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.status import PostprocessStatus
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- from app.hot_content.demand_export import (
- attach_wxindex_metadata,
- build_demand_export_rows,
- build_export_rows_from_record,
- )
- from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
- from app.hot_content.demand_quality import run_demand_quality_pipeline
- from app.hot_content.wxindex_trend import calc_wxindex_trend
- from app.hot_content.wxindex_words import (
- ensure_word_full_scores,
- slice_scores_lookback,
- sync_words_from_trend_json,
- )
- class WxindexSelectionSkipped(Exception):
- """微信指数选词无效时跳过后续查询。"""
- def _extract_json_object(text: str) -> dict[str, Any]:
- raw = text.strip()
- if raw.startswith("```"):
- raw = re.sub(r"^```(?:json)?\s*", "", raw)
- raw = re.sub(r"\s*```$", "", raw)
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, dict):
- return parsed
- except json.JSONDecodeError:
- pass
- match = re.search(r"\{[\s\S]*\}", raw)
- if not match:
- raise HotContentFlowError("llm output is not json object")
- try:
- parsed = json.loads(match.group(0))
- except json.JSONDecodeError as exc:
- raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
- if not isinstance(parsed, dict):
- raise HotContentFlowError("llm output is not json object")
- return parsed
- def _get_recent_range(lookback_days: int) -> tuple[str, str]:
- today = datetime.now(SHANGHAI_TZ).date()
- end_date = today - timedelta(days=1)
- start_date = end_date - timedelta(days=max(lookback_days, 1))
- return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
- def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
- rows = ((wx_resp.get("data") or {}).get("data") or [])
- if not isinstance(rows, list):
- return []
- series: list[dict[str, Any]] = []
- for row in rows:
- if not isinstance(row, dict):
- continue
- ymd = str(row.get("ymd") or "").strip()
- total_score = ((row.get("channel_score") or {}).get("total_score"))
- try:
- score_num = float(total_score) if total_score is not None else None
- except (TypeError, ValueError):
- score_num = None
- if ymd and score_num is not None:
- series.append({"ymd": ymd, "total_score": score_num})
- series.sort(key=lambda x: x["ymd"])
- return series
- def _normalize_demand_key(value: str) -> str:
- return "".join(value.split())
- def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
- lookup: dict[str, str] = {}
- for item in demand_name_set:
- demand_name = str(item).strip()
- if not demand_name:
- continue
- lookup.setdefault(demand_name, demand_name)
- compact_key = _normalize_demand_key(demand_name)
- if compact_key:
- lookup.setdefault(compact_key, demand_name)
- return lookup
- def _resolve_demand_name(
- demand_name: str,
- demand_lookup: dict[str, str],
- ) -> str | None:
- value = demand_name.strip()
- if not value:
- return None
- return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
- def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
- demand_names: list[str] = []
- seen: set[str] = set()
- for row in matched_word_rows:
- if not isinstance(row, dict):
- continue
- match_rows = row.get("匹配需求列表") or []
- if not isinstance(match_rows, list):
- continue
- for match in match_rows:
- if not isinstance(match, dict):
- continue
- demand_name = str(match.get("demand_name") or "").strip()
- if demand_name and demand_name not in seen:
- seen.add(demand_name)
- demand_names.append(demand_name)
- return demand_names
- class ContributionPostprocessService:
- def __init__(
- self,
- config: FlowConfig,
- repository: HotContentRepository,
- api_client: JsonApiClient,
- ):
- self.config = config
- self.repository = repository
- self.api_client = api_client
- def run(self) -> dict[str, Any]:
- records = self.repository.list_postprocess_candidates(
- limit=max(self.config.postprocess_batch_size, 1)
- )
- if not records:
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "success",
- "candidate_count": 0,
- "matched_count": 0,
- "wxindex_count": 0,
- "skipped_count": 0,
- "failed_count": 0,
- }
- )
- cache = DemandCacheService(
- self.config,
- self.repository,
- ).get_or_create_current_hour_cache()
- demand_cache_run_id = int(cache["id"])
- demand_name_set = cache["demand_name_set"]
- if not demand_name_set:
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "skipped",
- "reason": "empty_demand_cache",
- "demand_cache_run_id": demand_cache_run_id,
- "cache_source": cache.get("source"),
- "processed_count": 0,
- }
- )
- matched_count = 0
- wxindex_count = 0
- quality_count = 0
- exported_count = 0
- export_failed_count = 0
- skipped_count = 0
- failed_count = 0
- for record in records:
- record_id = int(record["id"])
- quality_saved = False
- match_result: dict[str, Any] | None = None
- try:
- match_result = record.get("contribution_demand_match_json")
- if (
- not isinstance(match_result, dict)
- or int(record.get("demand_cache_run_id") or 0) != demand_cache_run_id
- ):
- match_result = self.match_record(
- record=record,
- demand_name_set=demand_name_set,
- demand_cache_run_id=demand_cache_run_id,
- )
- self.repository.save_contribution_demand_match(
- record_id=record_id,
- demand_cache_run_id=demand_cache_run_id,
- match_json=match_result,
- )
- matched_count += 1
- sanitized_match_result = self.sanitize_match_result(
- match_result,
- demand_name_set=demand_name_set,
- demand_cache_run_id=demand_cache_run_id,
- )
- if sanitized_match_result != match_result:
- match_result = sanitized_match_result
- self.repository.save_contribution_demand_match(
- record_id=record_id,
- demand_cache_run_id=demand_cache_run_id,
- match_json=match_result,
- )
- postprocess_status = int(record.get("postprocess_status") or 0)
- existing_trend = record.get("wxindex_trend_json")
- if (
- postprocess_status == PostprocessStatus.WXINDEX_DONE
- and isinstance(existing_trend, dict)
- ):
- trend_result = existing_trend
- else:
- trend_result = self.build_wxindex_trend(record, match_result)
- if trend_result is None:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.SKIPPED,
- error_message="no matched demand words",
- )
- self._save_empty_demand_quality(record_id=record_id)
- export_rows_count, export_error = self._export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=None,
- )
- if export_error:
- export_failed_count += 1
- else:
- exported_count += export_rows_count
- skipped_count += 1
- continue
- if postprocess_status != PostprocessStatus.WXINDEX_DONE:
- self.repository.save_wxindex_trend(
- record_id=record_id,
- trend_json=trend_result,
- )
- self.sync_wxindex_words(
- record_id=record_id,
- trend_result=trend_result,
- event_created_at=record.get("created_at"),
- )
- wxindex_count += 1
- event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
- record=record,
- match_result=match_result,
- trend_result=trend_result,
- )
- self.repository.save_demand_quality(
- record_id=record_id,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- quality_saved = True
- quality_count += 1
- export_rows_count, export_error = self._export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=trend_result,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- if export_error:
- export_failed_count += 1
- else:
- exported_count += export_rows_count
- except WxindexSelectionSkipped as exc:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.SKIPPED,
- error_message=str(exc),
- )
- if isinstance(match_result, dict):
- self._save_empty_demand_quality(record_id=record_id)
- export_rows_count, export_error = self._export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=None,
- )
- if export_error:
- export_failed_count += 1
- else:
- exported_count += export_rows_count
- skipped_count += 1
- except Exception as exc:
- if not quality_saved:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.FAILED,
- error_message=str(exc),
- )
- failed_count += 1
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "success",
- "demand_cache_run_id": demand_cache_run_id,
- "cache_source": cache.get("source"),
- "cache_hour": str(cache.get("cache_hour") or ""),
- "demand_name_count": len(demand_name_set),
- "candidate_count": len(records),
- "matched_count": matched_count,
- "wxindex_count": wxindex_count,
- "quality_count": quality_count,
- "exported_count": exported_count,
- "export_failed_count": export_failed_count,
- "skipped_count": skipped_count,
- "failed_count": failed_count,
- }
- )
- def _export_demand_terms_if_needed(
- self,
- *,
- record: dict[str, Any],
- match_result: dict[str, Any],
- trend_result: dict[str, Any] | None,
- event_sense_json: dict[str, Any] | None = None,
- senior_fit_json: dict[str, Any] | None = None,
- ) -> tuple[int, str | None]:
- """导出需求词,失败时返回错误信息且不改变 postprocess_status。"""
- try:
- exported_rows = self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=trend_result,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- except Exception as exc:
- return 0, str(exc)
- return exported_rows, None
- def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
- try:
- result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
- except Exception as exc:
- result["hive_sync_error"] = str(exc)
- if result.get("hive_sync_error"):
- if result.get("status") == "success":
- result["status"] = "partial_failure"
- if int(result.get("export_failed_count") or 0) > 0 and result.get("status") == "success":
- result["status"] = "partial_failure"
- return result
- def sync_wxindex_words(
- self,
- *,
- record_id: int,
- trend_result: dict[str, Any],
- event_created_at: datetime | None = None,
- verbose: bool = False,
- ) -> dict[str, int]:
- return sync_words_from_trend_json(
- self.repository,
- self.api_client,
- self.config.wxindex_api_url,
- trend_json=trend_result,
- record_id=record_id,
- event_created_at=event_created_at,
- verbose=verbose,
- update_meta_if_exists=True,
- )
- def _save_empty_demand_quality(self, *, record_id: int) -> None:
- self.repository.save_demand_quality(
- record_id=record_id,
- event_sense_json={},
- senior_fit_json={},
- update_status=False,
- )
- def run_demand_quality_judgment(
- self,
- *,
- record: dict[str, Any],
- match_result: dict[str, Any],
- trend_result: dict[str, Any],
- ) -> tuple[dict[str, Any], dict[str, Any]]:
- channel_content_id = str(
- match_result.get("channelContentId") or record.get("unique_key") or ""
- ).strip()
- base_export_rows = attach_wxindex_metadata(
- build_demand_export_rows(
- match_result,
- contribution_points=(
- record.get("contribution_points_json")
- if isinstance(record.get("contribution_points_json"), dict)
- else None
- ),
- trend_json=trend_result,
- ),
- trend_result,
- wxindex_threshold=self.config.wxindex_score_threshold,
- )
- return run_demand_quality_pipeline(
- channel_content_id=channel_content_id,
- export_rows=base_export_rows,
- wxindex_threshold=self.config.wxindex_score_threshold,
- event_threshold=self.config.demand_event_sense_threshold,
- senior_threshold=self.config.demand_senior_fit_threshold,
- model=self.config.demand_quality_llm_model,
- max_attempts=self.config.demand_quality_llm_max_attempts,
- retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
- max_tokens=self.config.demand_quality_llm_max_tokens,
- )
- def export_demand_terms_if_needed(
- self,
- *,
- record: dict[str, Any],
- match_result: dict[str, Any],
- trend_result: dict[str, Any] | None,
- event_sense_json: dict[str, Any] | None = None,
- senior_fit_json: dict[str, Any] | None = None,
- ) -> int:
- normalized_record = {
- "contribution_demand_match_json": match_result,
- "contribution_points_json": (
- record.get("contribution_points_json")
- if isinstance(record.get("contribution_points_json"), dict)
- else None
- ),
- "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
- "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
- "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
- }
- export_rows = build_export_rows_from_record(
- normalized_record,
- wxindex_threshold=self.config.wxindex_score_threshold,
- event_sense_json=normalized_record["demand_event_sense_json"],
- senior_fit_json=normalized_record["demand_senior_fit_json"],
- event_threshold=self.config.demand_event_sense_threshold,
- senior_threshold=self.config.demand_senior_fit_threshold,
- )
- self.repository.replace_demand_export_rows(
- record_id=int(record["id"]),
- source=str(record.get("source") or ""),
- hot_title=str(record.get("title") or ""),
- article_title=str(record.get("article_title") or ""),
- rows=export_rows,
- )
- return len(export_rows)
- def match_record(
- self,
- *,
- record: dict[str, Any],
- demand_name_set: list[str],
- demand_cache_run_id: int,
- ) -> dict[str, Any]:
- contribution_points = record.get("contribution_points_json")
- if not isinstance(contribution_points, dict):
- raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
- channel_content_id = str(
- contribution_points.get("channelContentId") or record.get("unique_key") or ""
- ).strip()
- words_rows = contribution_points.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- word_list = [
- str(item.get("词") or "").strip()
- for item in words_rows
- if isinstance(item, dict) and str(item.get("词") or "").strip()
- ]
- llm_result = self.llm_match_single_article(
- channel_content_id=channel_content_id or str(record["unique_key"]),
- words=word_list,
- demand_name_set=demand_name_set,
- )
- demand_lookup = _build_demand_lookup(demand_name_set)
- matched_map: dict[str, list[dict[str, str]]] = {}
- for item in llm_result.get("matched") or []:
- if not isinstance(item, dict):
- continue
- word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
- demand_name = str(item.get("demand_name") or "").strip()
- reason = str(item.get("reason") or "").strip()
- if not word or word not in word_list:
- continue
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- matched_map.setdefault(word, []).append(
- {
- "demand_name": canonical_demand_name,
- "reason": reason,
- }
- )
- output_words: list[dict[str, Any]] = []
- matched_word_rows: list[dict[str, Any]] = []
- for row in words_rows:
- if not isinstance(row, dict):
- continue
- word = str(row.get("词") or "").strip()
- item = {"词": word, "贡献度": row.get("贡献度")}
- if word in matched_map:
- item["匹配需求列表"] = matched_map[word]
- matched_word_rows.append(item)
- output_words.append(item)
- matched_points = self.filter_matched_points(
- contribution_points.get("点列表"),
- set(matched_map.keys()),
- )
- return {
- "channelContentId": channel_content_id,
- "demand_cache_run_id": demand_cache_run_id,
- "高贡献词列表": output_words,
- "匹配到需求的词列表": matched_word_rows,
- "点列表": matched_points,
- }
- def sanitize_match_result(
- self,
- match_result: dict[str, Any],
- *,
- demand_name_set: list[str],
- demand_cache_run_id: int,
- ) -> dict[str, Any]:
- demand_lookup = _build_demand_lookup(demand_name_set)
- words_rows = match_result.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- output_words: list[dict[str, Any]] = []
- matched_word_rows: list[dict[str, Any]] = []
- valid_words: set[str] = set()
- for row in words_rows:
- if not isinstance(row, dict):
- continue
- word = str(row.get("词") or "").strip()
- item = {"词": word, "贡献度": row.get("贡献度")}
- match_rows = row.get("匹配需求列表") or []
- if not isinstance(match_rows, list):
- match_rows = []
- valid_match_rows: list[dict[str, str]] = []
- for match in match_rows:
- if not isinstance(match, dict):
- continue
- demand_name = str(match.get("demand_name") or "").strip()
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- valid_match_rows.append(
- {
- "demand_name": canonical_demand_name,
- "reason": str(match.get("reason") or "").strip(),
- }
- )
- if word and valid_match_rows:
- item["匹配需求列表"] = valid_match_rows
- matched_word_rows.append(item)
- valid_words.add(word)
- output_words.append(item)
- return {
- "channelContentId": str(match_result.get("channelContentId") or ""),
- "demand_cache_run_id": demand_cache_run_id,
- "高贡献词列表": output_words,
- "匹配到需求的词列表": matched_word_rows,
- "点列表": self.filter_matched_points(
- match_result.get("点列表"),
- valid_words,
- ),
- }
- def llm_match_single_article(
- self,
- *,
- channel_content_id: str,
- words: list[str],
- demand_name_set: list[str],
- ) -> dict[str, Any]:
- if not words:
- return {"source": channel_content_id, "matched": []}
- system_prompt = """
- 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
- # 任务
- 我会提供两组数据:
- 1. 热点词列表:一组待匹配的热点词语
- 2. 需求词库:一组已有的需求词语
- 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
- # 匹配标准
- 满足以下任意一条,则视为匹配成功:
- - 热点词与需求词含义相同或高度相近(如同义词、近义词)
- - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
- - 热点词与需求词在用户意图上高度一致
- 以下情况,不得视为匹配:
- - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
- - 热点词与需求词只有表面字符重叠,语义方向不同
- - 热点词是需求词的上位概念(范围过宽,含义不够精确)
- - 两者只是同属某个大类,但具体含义差异明显
- # 多词组成的需求词处理规则
- 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
- # 输出规则
- 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
- # 约束
- 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
- """
- user_payload = {
- "source": channel_content_id,
- "words": words,
- "demand_name_set": demand_name_set,
- "output_schema": {
- "source": "string",
- "matched": [
- {
- "title": "string, must be selected from words",
- "demand_name": "string, must be selected from demand_name_set",
- "reason": "string",
- }
- ],
- },
- }
- last_error: Exception | None = None
- for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=self.config.contribution_match_llm_model or None,
- temperature=0,
- max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- parsed.setdefault("source", channel_content_id)
- parsed.setdefault("matched", [])
- return parsed
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
- time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
- raise HotContentFlowError(
- f"llm match failed for channelContentId={channel_content_id}: {last_error}"
- ) from last_error
- @staticmethod
- def _build_word_demand_match_result(
- *,
- word: str,
- llm_result: dict[str, Any],
- demand_lookup: dict[str, str],
- ) -> dict[str, Any]:
- target_word = str(word or "").strip()
- match_list: list[dict[str, str]] = []
- seen: set[tuple[str, str]] = set()
- for item in llm_result.get("matched") or []:
- if not isinstance(item, dict):
- continue
- matched_word = str(
- item.get("title") or item.get("word") or item.get("词") or ""
- ).strip()
- demand_name = str(item.get("demand_name") or "").strip()
- reason = str(item.get("reason") or "").strip()
- if matched_word != target_word:
- continue
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- dedupe_key = (canonical_demand_name, reason)
- if dedupe_key in seen:
- continue
- seen.add(dedupe_key)
- match_list.append(
- {
- "demand_name": canonical_demand_name,
- "reason": reason,
- }
- )
- matched_demand_names: list[str] = []
- matched_seen: set[str] = set()
- for item in match_list:
- demand_name = str(item.get("demand_name") or "").strip()
- if demand_name and demand_name not in matched_seen:
- matched_seen.add(demand_name)
- matched_demand_names.append(demand_name)
- return {
- "word": target_word,
- "matched": bool(match_list),
- "matched_demand": " ".join(matched_demand_names),
- "match_list": match_list,
- }
- def match_words_to_demand_pool(
- self,
- *,
- words: list[str],
- demand_name_set: list[str],
- source_id: str | None = None,
- ) -> dict[str, dict[str, Any]]:
- """批量热词与票圈内部需求池匹配,返回 word -> match_result。"""
- cleaned: list[str] = []
- seen: set[str] = set()
- for raw in words:
- word = str(raw or "").strip()
- if not word or word in seen:
- continue
- seen.add(word)
- cleaned.append(word)
- if not cleaned:
- return {}
- llm_result = self.llm_match_single_article(
- channel_content_id=source_id or f"wxindex_words:{','.join(cleaned[:3])}",
- words=cleaned,
- demand_name_set=demand_name_set,
- )
- demand_lookup = _build_demand_lookup(demand_name_set)
- return {
- word: self._build_word_demand_match_result(
- word=word,
- llm_result=llm_result,
- demand_lookup=demand_lookup,
- )
- for word in cleaned
- }
- def match_single_word_to_demand_pool(
- self,
- *,
- word: str,
- demand_name_set: list[str],
- source_id: str | None = None,
- ) -> dict[str, Any]:
- """单热词与票圈内部需求池匹配(复用主流程 LLM 规则)。"""
- target_word = str(word or "").strip()
- if not target_word:
- return {
- "word": "",
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- }
- batch_result = self.match_words_to_demand_pool(
- words=[target_word],
- demand_name_set=demand_name_set,
- source_id=source_id,
- )
- return batch_result.get(
- target_word,
- {
- "word": target_word,
- "matched": False,
- "matched_demand": "",
- "match_list": [],
- },
- )
- def build_wxindex_trend(
- self,
- record: dict[str, Any],
- match_result: dict[str, Any],
- ) -> dict[str, Any] | None:
- matched_word_rows = match_result.get("匹配到需求的词列表") or []
- if not isinstance(matched_word_rows, list) or not matched_word_rows:
- return None
- contribution_words = [
- str(row.get("词") or "").strip()
- for row in matched_word_rows
- if isinstance(row, dict) and str(row.get("词") or "").strip()
- ]
- if not contribution_words:
- return None
- channel_content_id = str(
- match_result.get("channelContentId") or record.get("unique_key") or ""
- )
- article_title, body_text = self.extract_article_text(record)
- matched_demands = _collect_matched_demand_names(matched_word_rows)
- pick = self.llm_extract_wxindex_words(
- channel_content_id=channel_content_id,
- article_title=article_title,
- body_text=body_text,
- contribution_words=contribution_words,
- matched_demands=matched_demands,
- )
- selected_words = pick["selected_words"]
- threshold = float(self.config.wxindex_score_threshold)
- wxindex_searches: list[dict[str, Any]] = []
- event_created_at = record.get("created_at")
- for keyword in selected_words:
- full_scores, _action = ensure_word_full_scores(
- self.repository,
- self.api_client,
- self.config.wxindex_api_url,
- keyword=keyword,
- event_created_at=event_created_at,
- update_meta_if_exists=True,
- )
- series, start_ymd, end_ymd = slice_scores_lookback(
- full_scores,
- self.config.wxindex_lookback_days,
- )
- latest_score = series[-1]["total_score"] if series else None
- wxindex_searches.append(
- {
- "keyword": keyword,
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- "total_score_7d": series,
- "latest_total_score": latest_score,
- "threshold": threshold,
- "latest_gt_threshold": (
- False
- if latest_score is None
- else latest_score >= threshold
- ),
- "trend": calc_wxindex_trend(series),
- }
- )
- searchable = [
- item
- for item in wxindex_searches
- if item.get("latest_total_score") is not None
- ]
- if not searchable:
- raise WxindexSelectionSkipped(
- f"no wxindex score for any keyword in {channel_content_id}: "
- f"{selected_words}"
- )
- best = max(searchable, key=lambda item: float(item["latest_total_score"]))
- selected_word = str(best["keyword"])
- latest_score = best["latest_total_score"]
- series = best["total_score_7d"]
- return {
- "channelContentId": channel_content_id,
- "article_title": article_title,
- "llm_selected_words": selected_words,
- "llm_selected_word": selected_word,
- "llm_reason": pick["reason"],
- "wxindex_searches": wxindex_searches,
- "wxindex": {
- "keyword": selected_word,
- "keywords": selected_words,
- "start_ymd": best["start_ymd"],
- "end_ymd": best["end_ymd"],
- "total_score_7d": series,
- "latest_total_score": latest_score,
- "threshold": threshold,
- "latest_gt_threshold": latest_score >= threshold,
- "trend": best["trend"],
- },
- }
- def llm_extract_wxindex_words(
- self,
- *,
- channel_content_id: str,
- article_title: str,
- body_text: str,
- contribution_words: list[str],
- matched_demands: list[str],
- ) -> dict[str, Any]:
- system_prompt = """
- #角色
- 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
- # 任务
- 我会提供文章标题、正文,以及两类备选词来源:
- 1. 高贡献词:文章贡献度较高的关键词
- 2. 已匹配需求:已与需求库匹配上的需求名
- 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
- 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
- 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
- # 输出规则
- 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
- """
- user_payload = {
- "source": channel_content_id,
- "article_title": article_title,
- "article_body_text": body_text,
- "contribution_words": contribution_words,
- "matched_demands": matched_demands,
- "output_schema": {
- "source": "string",
- "selected_words": [
- "string, concise keyword for wxindex search, one or more"
- ],
- "reason": "string",
- },
- "constraints": [
- "selected_words 为数组,至少 1 个词,可多个",
- "每个词简洁(2-4 字),适合微信指数检索",
- "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
- "多个词应分别覆盖不同事件或角度,避免语义重复",
- "reason 简洁说明,不超过60字",
- "仅输出 JSON 对象,不要 markdown 代码块",
- ],
- }
- last_error: Exception | None = None
- for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=self.config.wxindex_llm_model or None,
- temperature=0,
- max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- raw_words = parsed.get("selected_words")
- if isinstance(raw_words, str):
- raw_words = [raw_words]
- if not isinstance(raw_words, list):
- legacy_word = str(parsed.get("selected_word") or "").strip()
- raw_words = [legacy_word] if legacy_word else []
- selected_words: list[str] = []
- seen: set[str] = set()
- for item in raw_words:
- word = str(item or "").strip()
- if word and word not in seen:
- seen.add(word)
- selected_words.append(word)
- reason = str(parsed.get("reason") or "").strip()
- if not selected_words:
- raise WxindexSelectionSkipped(
- f"selected_words empty for {channel_content_id}"
- )
- return {"selected_words": selected_words, "reason": reason}
- except WxindexSelectionSkipped:
- raise
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(self.config.wxindex_llm_max_attempts, 1):
- continue
- raise HotContentFlowError(
- f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
- ) from last_error
- @staticmethod
- def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
- if not matched_words or not isinstance(raw_points, list):
- return []
- matched_points: list[dict[str, Any]] = []
- for point in raw_points:
- if not isinstance(point, dict):
- continue
- point_match_rows = point.get("匹配词列表") or []
- if not isinstance(point_match_rows, list):
- point_match_rows = []
- keep_rows = [
- row
- for row in point_match_rows
- if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
- ]
- if keep_rows:
- matched_points.append(
- {
- "来源": str(point.get("来源") or ""),
- "点": str(point.get("点") or ""),
- "点描述": str(point.get("点描述") or ""),
- "匹配词列表": keep_rows,
- }
- )
- return matched_points
- @staticmethod
- def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
- decode_result = record.get("decode_result_json")
- target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
- if not isinstance(target_post, dict):
- target_post = {}
- article_title = str(
- target_post.get("title") or record.get("article_title") or ""
- ).strip()
- body_text = str(
- target_post.get("body_text") or record.get("article_body") or ""
- ).strip()
- return article_title, body_text
- def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
- flow_config = config or load_flow_config()
- repository = HotContentRepository(flow_config.mysql)
- try:
- api_client = JsonApiClient(
- timeout_seconds=flow_config.request_timeout_seconds,
- verify_ssl=flow_config.https_verify_ssl,
- )
- service = ContributionPostprocessService(flow_config, repository, api_client)
- return service.run()
- finally:
- repository.close()
|