| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865 |
- """贡献点需求匹配与微信指数趋势后处理服务。"""
- from __future__ import annotations
- import json
- import re
- import time
- from datetime import datetime, timedelta
- from typing import Any
- from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
- from app.hot_content.client import JsonApiClient
- from app.hot_content.config import load_flow_config
- from app.hot_content.demand_cache_service import DemandCacheService
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.status import PostprocessStatus
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- from app.hot_content.demand_export import (
- attach_wxindex_metadata,
- build_demand_export_rows,
- build_export_rows_from_record,
- )
- from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
- from app.hot_content.demand_quality import run_demand_quality_pipeline
- from app.hot_content.wxindex_trend import calc_wxindex_trend
- from app.hot_content.wxindex_words import (
- ensure_word_full_scores,
- slice_scores_lookback,
- sync_words_from_trend_json,
- )
- class WxindexSelectionSkipped(Exception):
- """微信指数选词无效时跳过后续查询。"""
- def _extract_json_object(text: str) -> dict[str, Any]:
- raw = text.strip()
- if raw.startswith("```"):
- raw = re.sub(r"^```(?:json)?\s*", "", raw)
- raw = re.sub(r"\s*```$", "", raw)
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, dict):
- return parsed
- except json.JSONDecodeError:
- pass
- match = re.search(r"\{[\s\S]*\}", raw)
- if not match:
- raise HotContentFlowError("llm output is not json object")
- try:
- parsed = json.loads(match.group(0))
- except json.JSONDecodeError as exc:
- raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
- if not isinstance(parsed, dict):
- raise HotContentFlowError("llm output is not json object")
- return parsed
- def _get_recent_range(lookback_days: int) -> tuple[str, str]:
- today = datetime.now(SHANGHAI_TZ).date()
- end_date = today - timedelta(days=1)
- start_date = end_date - timedelta(days=max(lookback_days, 1))
- return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
- def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
- rows = ((wx_resp.get("data") or {}).get("data") or [])
- if not isinstance(rows, list):
- return []
- series: list[dict[str, Any]] = []
- for row in rows:
- if not isinstance(row, dict):
- continue
- ymd = str(row.get("ymd") or "").strip()
- total_score = ((row.get("channel_score") or {}).get("total_score"))
- try:
- score_num = float(total_score) if total_score is not None else None
- except (TypeError, ValueError):
- score_num = None
- if ymd and score_num is not None:
- series.append({"ymd": ymd, "total_score": score_num})
- series.sort(key=lambda x: x["ymd"])
- return series
- def _normalize_demand_key(value: str) -> str:
- return "".join(value.split())
- def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
- lookup: dict[str, str] = {}
- for item in demand_name_set:
- demand_name = str(item).strip()
- if not demand_name:
- continue
- lookup.setdefault(demand_name, demand_name)
- compact_key = _normalize_demand_key(demand_name)
- if compact_key:
- lookup.setdefault(compact_key, demand_name)
- return lookup
- def _resolve_demand_name(
- demand_name: str,
- demand_lookup: dict[str, str],
- ) -> str | None:
- value = demand_name.strip()
- if not value:
- return None
- return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
- def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
- demand_names: list[str] = []
- seen: set[str] = set()
- for row in matched_word_rows:
- if not isinstance(row, dict):
- continue
- match_rows = row.get("匹配需求列表") or []
- if not isinstance(match_rows, list):
- continue
- for match in match_rows:
- if not isinstance(match, dict):
- continue
- demand_name = str(match.get("demand_name") or "").strip()
- if demand_name and demand_name not in seen:
- seen.add(demand_name)
- demand_names.append(demand_name)
- return demand_names
- class ContributionPostprocessService:
- def __init__(
- self,
- config: FlowConfig,
- repository: HotContentRepository,
- api_client: JsonApiClient,
- ):
- self.config = config
- self.repository = repository
- self.api_client = api_client
- def run(self) -> dict[str, Any]:
- records = self.repository.list_postprocess_candidates(
- limit=max(self.config.postprocess_batch_size, 1)
- )
- if not records:
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "success",
- "candidate_count": 0,
- "matched_count": 0,
- "wxindex_count": 0,
- "skipped_count": 0,
- "failed_count": 0,
- }
- )
- cache = DemandCacheService(
- self.config,
- self.repository,
- ).get_or_create_current_hour_cache()
- demand_cache_run_id = int(cache["id"])
- demand_name_set = cache["demand_name_set"]
- if not demand_name_set:
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "skipped",
- "reason": "empty_demand_cache",
- "demand_cache_run_id": demand_cache_run_id,
- "cache_source": cache.get("source"),
- "processed_count": 0,
- }
- )
- matched_count = 0
- wxindex_count = 0
- quality_count = 0
- exported_count = 0
- skipped_count = 0
- failed_count = 0
- for record in records:
- record_id = int(record["id"])
- try:
- match_result = record.get("contribution_demand_match_json")
- if not isinstance(match_result, dict):
- match_result = self.match_record(
- record=record,
- demand_name_set=demand_name_set,
- demand_cache_run_id=demand_cache_run_id,
- )
- self.repository.save_contribution_demand_match(
- record_id=record_id,
- demand_cache_run_id=demand_cache_run_id,
- match_json=match_result,
- )
- matched_count += 1
- sanitized_match_result = self.sanitize_match_result(
- match_result,
- demand_name_set=demand_name_set,
- demand_cache_run_id=demand_cache_run_id,
- )
- if sanitized_match_result != match_result:
- match_result = sanitized_match_result
- self.repository.save_contribution_demand_match(
- record_id=record_id,
- demand_cache_run_id=demand_cache_run_id,
- match_json=match_result,
- )
- trend_result = self.build_wxindex_trend(record, match_result)
- if trend_result is None:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.SKIPPED,
- error_message="no matched demand words",
- )
- self._save_empty_demand_quality(record_id=record_id)
- exported_count += self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=None,
- )
- skipped_count += 1
- continue
- self.repository.save_wxindex_trend(
- record_id=record_id,
- trend_json=trend_result,
- )
- self.sync_wxindex_words(
- record_id=record_id,
- trend_result=trend_result,
- event_created_at=record.get("created_at"),
- )
- event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
- record=record,
- match_result=match_result,
- trend_result=trend_result,
- )
- self.repository.save_demand_quality(
- record_id=record_id,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- exported_count += self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=trend_result,
- event_sense_json=event_sense_json,
- senior_fit_json=senior_fit_json,
- )
- wxindex_count += 1
- quality_count += 1
- except WxindexSelectionSkipped as exc:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.SKIPPED,
- error_message=str(exc),
- )
- if isinstance(match_result, dict):
- self._save_empty_demand_quality(record_id=record_id)
- exported_count += self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=None,
- )
- skipped_count += 1
- except Exception as exc:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.FAILED,
- error_message=str(exc),
- )
- failed_count += 1
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "success",
- "demand_cache_run_id": demand_cache_run_id,
- "cache_source": cache.get("source"),
- "cache_hour": str(cache.get("cache_hour") or ""),
- "demand_name_count": len(demand_name_set),
- "candidate_count": len(records),
- "matched_count": matched_count,
- "wxindex_count": wxindex_count,
- "quality_count": quality_count,
- "exported_count": exported_count,
- "skipped_count": skipped_count,
- "failed_count": failed_count,
- }
- )
- def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
- try:
- result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
- except Exception as exc:
- result["hive_sync_error"] = str(exc)
- return result
- def sync_wxindex_words(
- self,
- *,
- record_id: int,
- trend_result: dict[str, Any],
- event_created_at: datetime | None = None,
- verbose: bool = False,
- ) -> dict[str, int]:
- return sync_words_from_trend_json(
- self.repository,
- self.api_client,
- self.config.wxindex_api_url,
- trend_json=trend_result,
- record_id=record_id,
- event_created_at=event_created_at,
- verbose=verbose,
- update_meta_if_exists=True,
- )
- def _save_empty_demand_quality(self, *, record_id: int) -> None:
- self.repository.save_demand_quality(
- record_id=record_id,
- event_sense_json={},
- senior_fit_json={},
- update_status=False,
- )
- def run_demand_quality_judgment(
- self,
- *,
- record: dict[str, Any],
- match_result: dict[str, Any],
- trend_result: dict[str, Any],
- ) -> tuple[dict[str, Any], dict[str, Any]]:
- channel_content_id = str(
- match_result.get("channelContentId") or record.get("unique_key") or ""
- ).strip()
- base_export_rows = attach_wxindex_metadata(
- build_demand_export_rows(
- match_result,
- contribution_points=(
- record.get("contribution_points_json")
- if isinstance(record.get("contribution_points_json"), dict)
- else None
- ),
- trend_json=trend_result,
- ),
- trend_result,
- wxindex_threshold=self.config.wxindex_score_threshold,
- )
- return run_demand_quality_pipeline(
- channel_content_id=channel_content_id,
- export_rows=base_export_rows,
- wxindex_threshold=self.config.wxindex_score_threshold,
- event_threshold=self.config.demand_event_sense_threshold,
- senior_threshold=self.config.demand_senior_fit_threshold,
- model=self.config.demand_quality_llm_model,
- max_attempts=self.config.demand_quality_llm_max_attempts,
- retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
- max_tokens=self.config.demand_quality_llm_max_tokens,
- )
- def export_demand_terms_if_needed(
- self,
- *,
- record: dict[str, Any],
- match_result: dict[str, Any],
- trend_result: dict[str, Any] | None,
- event_sense_json: dict[str, Any] | None = None,
- senior_fit_json: dict[str, Any] | None = None,
- ) -> int:
- normalized_record = {
- "contribution_demand_match_json": match_result,
- "contribution_points_json": (
- record.get("contribution_points_json")
- if isinstance(record.get("contribution_points_json"), dict)
- else None
- ),
- "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
- "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
- "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
- }
- export_rows = build_export_rows_from_record(
- normalized_record,
- wxindex_threshold=self.config.wxindex_score_threshold,
- event_sense_json=normalized_record["demand_event_sense_json"],
- senior_fit_json=normalized_record["demand_senior_fit_json"],
- event_threshold=self.config.demand_event_sense_threshold,
- senior_threshold=self.config.demand_senior_fit_threshold,
- )
- self.repository.replace_demand_export_rows(
- record_id=int(record["id"]),
- source=str(record.get("source") or ""),
- hot_title=str(record.get("title") or ""),
- article_title=str(record.get("article_title") or ""),
- rows=export_rows,
- )
- return len(export_rows)
- def match_record(
- self,
- *,
- record: dict[str, Any],
- demand_name_set: list[str],
- demand_cache_run_id: int,
- ) -> dict[str, Any]:
- contribution_points = record.get("contribution_points_json")
- if not isinstance(contribution_points, dict):
- raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
- channel_content_id = str(
- contribution_points.get("channelContentId") or record.get("unique_key") or ""
- ).strip()
- words_rows = contribution_points.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- word_list = [
- str(item.get("词") or "").strip()
- for item in words_rows
- if isinstance(item, dict) and str(item.get("词") or "").strip()
- ]
- llm_result = self.llm_match_single_article(
- channel_content_id=channel_content_id or str(record["unique_key"]),
- words=word_list,
- demand_name_set=demand_name_set,
- )
- demand_lookup = _build_demand_lookup(demand_name_set)
- matched_map: dict[str, list[dict[str, str]]] = {}
- for item in llm_result.get("matched") or []:
- if not isinstance(item, dict):
- continue
- word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
- demand_name = str(item.get("demand_name") or "").strip()
- reason = str(item.get("reason") or "").strip()
- if not word or word not in word_list:
- continue
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- matched_map.setdefault(word, []).append(
- {
- "demand_name": canonical_demand_name,
- "reason": reason,
- }
- )
- output_words: list[dict[str, Any]] = []
- matched_word_rows: list[dict[str, Any]] = []
- for row in words_rows:
- if not isinstance(row, dict):
- continue
- word = str(row.get("词") or "").strip()
- item = {"词": word, "贡献度": row.get("贡献度")}
- if word in matched_map:
- item["匹配需求列表"] = matched_map[word]
- matched_word_rows.append(item)
- output_words.append(item)
- matched_points = self.filter_matched_points(
- contribution_points.get("点列表"),
- set(matched_map.keys()),
- )
- return {
- "channelContentId": channel_content_id,
- "demand_cache_run_id": demand_cache_run_id,
- "高贡献词列表": output_words,
- "匹配到需求的词列表": matched_word_rows,
- "点列表": matched_points,
- }
- def sanitize_match_result(
- self,
- match_result: dict[str, Any],
- *,
- demand_name_set: list[str],
- demand_cache_run_id: int,
- ) -> dict[str, Any]:
- demand_lookup = _build_demand_lookup(demand_name_set)
- words_rows = match_result.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- output_words: list[dict[str, Any]] = []
- matched_word_rows: list[dict[str, Any]] = []
- valid_words: set[str] = set()
- for row in words_rows:
- if not isinstance(row, dict):
- continue
- word = str(row.get("词") or "").strip()
- item = {"词": word, "贡献度": row.get("贡献度")}
- match_rows = row.get("匹配需求列表") or []
- if not isinstance(match_rows, list):
- match_rows = []
- valid_match_rows: list[dict[str, str]] = []
- for match in match_rows:
- if not isinstance(match, dict):
- continue
- demand_name = str(match.get("demand_name") or "").strip()
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- valid_match_rows.append(
- {
- "demand_name": canonical_demand_name,
- "reason": str(match.get("reason") or "").strip(),
- }
- )
- if word and valid_match_rows:
- item["匹配需求列表"] = valid_match_rows
- matched_word_rows.append(item)
- valid_words.add(word)
- output_words.append(item)
- return {
- "channelContentId": str(match_result.get("channelContentId") or ""),
- "demand_cache_run_id": demand_cache_run_id,
- "高贡献词列表": output_words,
- "匹配到需求的词列表": matched_word_rows,
- "点列表": self.filter_matched_points(
- match_result.get("点列表"),
- valid_words,
- ),
- }
- def llm_match_single_article(
- self,
- *,
- channel_content_id: str,
- words: list[str],
- demand_name_set: list[str],
- ) -> dict[str, Any]:
- if not words:
- return {"source": channel_content_id, "matched": []}
- system_prompt = """
- 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
- # 任务
- 我会提供两组数据:
- 1. 热点词列表:一组待匹配的热点词语
- 2. 需求词库:一组已有的需求词语
- 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
- # 匹配标准
- 满足以下任意一条,则视为匹配成功:
- - 热点词与需求词含义相同或高度相近(如同义词、近义词)
- - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
- - 热点词与需求词在用户意图上高度一致
- 以下情况,不得视为匹配:
- - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
- - 热点词与需求词只有表面字符重叠,语义方向不同
- - 热点词是需求词的上位概念(范围过宽,含义不够精确)
- - 两者只是同属某个大类,但具体含义差异明显
- # 多词组成的需求词处理规则
- 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
- # 输出规则
- 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
- # 约束
- 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
- """
- user_payload = {
- "source": channel_content_id,
- "words": words,
- "demand_name_set": demand_name_set,
- "output_schema": {
- "source": "string",
- "matched": [
- {
- "title": "string, must be selected from words",
- "demand_name": "string, must be selected from demand_name_set",
- "reason": "string",
- }
- ],
- },
- }
- last_error: Exception | None = None
- for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=self.config.contribution_match_llm_model or None,
- temperature=0,
- max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- parsed.setdefault("source", channel_content_id)
- parsed.setdefault("matched", [])
- return parsed
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
- time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
- raise HotContentFlowError(
- f"llm match failed for channelContentId={channel_content_id}: {last_error}"
- ) from last_error
- def build_wxindex_trend(
- self,
- record: dict[str, Any],
- match_result: dict[str, Any],
- ) -> dict[str, Any] | None:
- matched_word_rows = match_result.get("匹配到需求的词列表") or []
- if not isinstance(matched_word_rows, list) or not matched_word_rows:
- return None
- contribution_words = [
- str(row.get("词") or "").strip()
- for row in matched_word_rows
- if isinstance(row, dict) and str(row.get("词") or "").strip()
- ]
- if not contribution_words:
- return None
- channel_content_id = str(
- match_result.get("channelContentId") or record.get("unique_key") or ""
- )
- article_title, body_text = self.extract_article_text(record)
- matched_demands = _collect_matched_demand_names(matched_word_rows)
- pick = self.llm_extract_wxindex_words(
- channel_content_id=channel_content_id,
- article_title=article_title,
- body_text=body_text,
- contribution_words=contribution_words,
- matched_demands=matched_demands,
- )
- selected_words = pick["selected_words"]
- threshold = float(self.config.wxindex_score_threshold)
- wxindex_searches: list[dict[str, Any]] = []
- event_created_at = record.get("created_at")
- for keyword in selected_words:
- full_scores, _action = ensure_word_full_scores(
- self.repository,
- self.api_client,
- self.config.wxindex_api_url,
- keyword=keyword,
- event_created_at=event_created_at,
- update_meta_if_exists=True,
- )
- series, start_ymd, end_ymd = slice_scores_lookback(
- full_scores,
- self.config.wxindex_lookback_days,
- )
- latest_score = series[-1]["total_score"] if series else None
- wxindex_searches.append(
- {
- "keyword": keyword,
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- "total_score_7d": series,
- "latest_total_score": latest_score,
- "threshold": threshold,
- "latest_gt_threshold": (
- False
- if latest_score is None
- else latest_score >= threshold
- ),
- "trend": calc_wxindex_trend(series),
- }
- )
- searchable = [
- item
- for item in wxindex_searches
- if item.get("latest_total_score") is not None
- ]
- if not searchable:
- raise WxindexSelectionSkipped(
- f"no wxindex score for any keyword in {channel_content_id}: "
- f"{selected_words}"
- )
- best = max(searchable, key=lambda item: float(item["latest_total_score"]))
- selected_word = str(best["keyword"])
- latest_score = best["latest_total_score"]
- series = best["total_score_7d"]
- return {
- "channelContentId": channel_content_id,
- "article_title": article_title,
- "llm_selected_words": selected_words,
- "llm_selected_word": selected_word,
- "llm_reason": pick["reason"],
- "wxindex_searches": wxindex_searches,
- "wxindex": {
- "keyword": selected_word,
- "keywords": selected_words,
- "start_ymd": best["start_ymd"],
- "end_ymd": best["end_ymd"],
- "total_score_7d": series,
- "latest_total_score": latest_score,
- "threshold": threshold,
- "latest_gt_threshold": latest_score >= threshold,
- "trend": best["trend"],
- },
- }
- def llm_extract_wxindex_words(
- self,
- *,
- channel_content_id: str,
- article_title: str,
- body_text: str,
- contribution_words: list[str],
- matched_demands: list[str],
- ) -> dict[str, Any]:
- system_prompt = """
- #角色
- 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
- # 任务
- 我会提供文章标题、正文,以及两类备选词来源:
- 1. 高贡献词:文章贡献度较高的关键词
- 2. 已匹配需求:已与需求库匹配上的需求名
- 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
- 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
- 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
- # 输出规则
- 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
- """
- user_payload = {
- "source": channel_content_id,
- "article_title": article_title,
- "article_body_text": body_text,
- "contribution_words": contribution_words,
- "matched_demands": matched_demands,
- "output_schema": {
- "source": "string",
- "selected_words": [
- "string, concise keyword for wxindex search, one or more"
- ],
- "reason": "string",
- },
- "constraints": [
- "selected_words 为数组,至少 1 个词,可多个",
- "每个词简洁(2-4 字),适合微信指数检索",
- "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
- "多个词应分别覆盖不同事件或角度,避免语义重复",
- "reason 简洁说明,不超过60字",
- "仅输出 JSON 对象,不要 markdown 代码块",
- ],
- }
- last_error: Exception | None = None
- for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=self.config.wxindex_llm_model or None,
- temperature=0,
- max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- raw_words = parsed.get("selected_words")
- if isinstance(raw_words, str):
- raw_words = [raw_words]
- if not isinstance(raw_words, list):
- legacy_word = str(parsed.get("selected_word") or "").strip()
- raw_words = [legacy_word] if legacy_word else []
- selected_words: list[str] = []
- seen: set[str] = set()
- for item in raw_words:
- word = str(item or "").strip()
- if word and word not in seen:
- seen.add(word)
- selected_words.append(word)
- reason = str(parsed.get("reason") or "").strip()
- if not selected_words:
- raise WxindexSelectionSkipped(
- f"selected_words empty for {channel_content_id}"
- )
- return {"selected_words": selected_words, "reason": reason}
- except WxindexSelectionSkipped:
- raise
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(self.config.wxindex_llm_max_attempts, 1):
- continue
- raise HotContentFlowError(
- f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
- ) from last_error
- @staticmethod
- def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
- if not matched_words or not isinstance(raw_points, list):
- return []
- matched_points: list[dict[str, Any]] = []
- for point in raw_points:
- if not isinstance(point, dict):
- continue
- point_match_rows = point.get("匹配词列表") or []
- if not isinstance(point_match_rows, list):
- point_match_rows = []
- keep_rows = [
- row
- for row in point_match_rows
- if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
- ]
- if keep_rows:
- matched_points.append(
- {
- "来源": str(point.get("来源") or ""),
- "点": str(point.get("点") or ""),
- "点描述": str(point.get("点描述") or ""),
- "匹配词列表": keep_rows,
- }
- )
- return matched_points
- @staticmethod
- def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
- decode_result = record.get("decode_result_json")
- target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
- if not isinstance(target_post, dict):
- target_post = {}
- article_title = str(
- target_post.get("title") or record.get("article_title") or ""
- ).strip()
- body_text = str(
- target_post.get("body_text") or record.get("article_body") or ""
- ).strip()
- return article_title, body_text
- def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
- flow_config = config or load_flow_config()
- repository = HotContentRepository(flow_config.mysql)
- try:
- api_client = JsonApiClient(
- timeout_seconds=flow_config.request_timeout_seconds,
- verify_ssl=flow_config.https_verify_ssl,
- )
- service = ContributionPostprocessService(flow_config, repository, api_client)
- return service.run()
- finally:
- repository.close()
|