| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666 |
- """贡献点需求匹配与微信指数趋势后处理服务。"""
- from __future__ import annotations
- import json
- import re
- import time
- from datetime import datetime, timedelta
- from typing import Any
- from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
- from app.hot_content.client import JsonApiClient
- from app.hot_content.config import load_flow_config
- from app.hot_content.demand_cache_service import DemandCacheService
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.status import PostprocessStatus
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- from app.hot_content.demand_export import (
- attach_wxindex_metadata,
- build_demand_export_rows,
- )
- from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
- from app.hot_content.wxindex_trend import calc_wxindex_trend
- class WxindexSelectionSkipped(Exception):
- """微信指数选词无效时跳过后续查询。"""
- def _extract_json_object(text: str) -> dict[str, Any]:
- raw = text.strip()
- if raw.startswith("```"):
- raw = re.sub(r"^```(?:json)?\s*", "", raw)
- raw = re.sub(r"\s*```$", "", raw)
- try:
- parsed = json.loads(raw)
- if isinstance(parsed, dict):
- return parsed
- except json.JSONDecodeError:
- pass
- match = re.search(r"\{[\s\S]*\}", raw)
- if not match:
- raise HotContentFlowError("llm output is not json object")
- try:
- parsed = json.loads(match.group(0))
- except json.JSONDecodeError as exc:
- raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
- if not isinstance(parsed, dict):
- raise HotContentFlowError("llm output is not json object")
- return parsed
- def _get_recent_range(lookback_days: int) -> tuple[str, str]:
- today = datetime.now(SHANGHAI_TZ).date()
- end_date = today - timedelta(days=1)
- start_date = end_date - timedelta(days=max(lookback_days, 1))
- return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
- def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
- rows = ((wx_resp.get("data") or {}).get("data") or [])
- if not isinstance(rows, list):
- return []
- series: list[dict[str, Any]] = []
- for row in rows:
- if not isinstance(row, dict):
- continue
- ymd = str(row.get("ymd") or "").strip()
- total_score = ((row.get("channel_score") or {}).get("total_score"))
- try:
- score_num = float(total_score) if total_score is not None else None
- except (TypeError, ValueError):
- score_num = None
- if ymd and score_num is not None:
- series.append({"ymd": ymd, "total_score": score_num})
- series.sort(key=lambda x: x["ymd"])
- return series
- def _normalize_demand_key(value: str) -> str:
- return "".join(value.split())
- def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
- lookup: dict[str, str] = {}
- for item in demand_name_set:
- demand_name = str(item).strip()
- if not demand_name:
- continue
- lookup.setdefault(demand_name, demand_name)
- compact_key = _normalize_demand_key(demand_name)
- if compact_key:
- lookup.setdefault(compact_key, demand_name)
- return lookup
- def _resolve_demand_name(
- demand_name: str,
- demand_lookup: dict[str, str],
- ) -> str | None:
- value = demand_name.strip()
- if not value:
- return None
- return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
- class ContributionPostprocessService:
- def __init__(
- self,
- config: FlowConfig,
- repository: HotContentRepository,
- api_client: JsonApiClient,
- ):
- self.config = config
- self.repository = repository
- self.api_client = api_client
- def run(self) -> dict[str, Any]:
- records = self.repository.list_postprocess_candidates(
- limit=max(self.config.postprocess_batch_size, 1)
- )
- if not records:
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "success",
- "candidate_count": 0,
- "matched_count": 0,
- "wxindex_count": 0,
- "skipped_count": 0,
- "failed_count": 0,
- }
- )
- cache = DemandCacheService(
- self.config,
- self.repository,
- ).get_or_create_current_hour_cache()
- demand_cache_run_id = int(cache["id"])
- demand_name_set = cache["demand_name_set"]
- if not demand_name_set:
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "skipped",
- "reason": "empty_demand_cache",
- "demand_cache_run_id": demand_cache_run_id,
- "cache_source": cache.get("source"),
- "processed_count": 0,
- }
- )
- matched_count = 0
- wxindex_count = 0
- exported_count = 0
- skipped_count = 0
- failed_count = 0
- for record in records:
- record_id = int(record["id"])
- try:
- match_result = record.get("contribution_demand_match_json")
- if not isinstance(match_result, dict):
- match_result = self.match_record(
- record=record,
- demand_name_set=demand_name_set,
- demand_cache_run_id=demand_cache_run_id,
- )
- self.repository.save_contribution_demand_match(
- record_id=record_id,
- demand_cache_run_id=demand_cache_run_id,
- match_json=match_result,
- )
- matched_count += 1
- sanitized_match_result = self.sanitize_match_result(
- match_result,
- demand_name_set=demand_name_set,
- demand_cache_run_id=demand_cache_run_id,
- )
- if sanitized_match_result != match_result:
- match_result = sanitized_match_result
- self.repository.save_contribution_demand_match(
- record_id=record_id,
- demand_cache_run_id=demand_cache_run_id,
- match_json=match_result,
- )
- trend_result = self.build_wxindex_trend(record, match_result)
- if trend_result is None:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.SKIPPED,
- error_message="no matched demand words",
- )
- exported_count += self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=None,
- )
- skipped_count += 1
- continue
- self.repository.save_wxindex_trend(
- record_id=record_id,
- trend_json=trend_result,
- )
- exported_count += self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=trend_result,
- )
- wxindex_count += 1
- except WxindexSelectionSkipped as exc:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.SKIPPED,
- error_message=str(exc),
- )
- if isinstance(match_result, dict):
- exported_count += self.export_demand_terms_if_needed(
- record=record,
- match_result=match_result,
- trend_result=None,
- )
- skipped_count += 1
- except Exception as exc:
- self.repository.update_postprocess_status(
- record_id=record_id,
- status=PostprocessStatus.FAILED,
- error_message=str(exc),
- )
- failed_count += 1
- return self._finalize_run_result(
- {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "status": "success",
- "demand_cache_run_id": demand_cache_run_id,
- "cache_source": cache.get("source"),
- "cache_hour": str(cache.get("cache_hour") or ""),
- "demand_name_count": len(demand_name_set),
- "candidate_count": len(records),
- "matched_count": matched_count,
- "wxindex_count": wxindex_count,
- "exported_count": exported_count,
- "skipped_count": skipped_count,
- "failed_count": failed_count,
- }
- )
- def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
- try:
- result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
- except Exception as exc:
- result["hive_sync_error"] = str(exc)
- return result
- def export_demand_terms_if_needed(
- self,
- *,
- record: dict[str, Any],
- match_result: dict[str, Any],
- trend_result: dict[str, Any] | None,
- ) -> int:
- export_rows = attach_wxindex_metadata(
- build_demand_export_rows(
- match_result,
- contribution_points=(
- record.get("contribution_points_json")
- if isinstance(record.get("contribution_points_json"), dict)
- else None
- ),
- trend_json=trend_result if isinstance(trend_result, dict) else None,
- ),
- trend_result if isinstance(trend_result, dict) else None,
- )
- self.repository.replace_demand_export_rows(
- record_id=int(record["id"]),
- source=str(record.get("source") or ""),
- hot_title=str(record.get("title") or ""),
- article_title=str(record.get("article_title") or ""),
- rows=export_rows,
- )
- return len(export_rows)
- def match_record(
- self,
- *,
- record: dict[str, Any],
- demand_name_set: list[str],
- demand_cache_run_id: int,
- ) -> dict[str, Any]:
- contribution_points = record.get("contribution_points_json")
- if not isinstance(contribution_points, dict):
- raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
- channel_content_id = str(
- contribution_points.get("channelContentId") or record.get("unique_key") or ""
- ).strip()
- words_rows = contribution_points.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- word_list = [
- str(item.get("词") or "").strip()
- for item in words_rows
- if isinstance(item, dict) and str(item.get("词") or "").strip()
- ]
- llm_result = self.llm_match_single_article(
- channel_content_id=channel_content_id or str(record["unique_key"]),
- words=word_list,
- demand_name_set=demand_name_set,
- )
- demand_lookup = _build_demand_lookup(demand_name_set)
- matched_map: dict[str, list[dict[str, str]]] = {}
- for item in llm_result.get("matched") or []:
- if not isinstance(item, dict):
- continue
- word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
- demand_name = str(item.get("demand_name") or "").strip()
- reason = str(item.get("reason") or "").strip()
- if not word or word not in word_list:
- continue
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- matched_map.setdefault(word, []).append(
- {
- "demand_name": canonical_demand_name,
- "reason": reason,
- }
- )
- output_words: list[dict[str, Any]] = []
- matched_word_rows: list[dict[str, Any]] = []
- for row in words_rows:
- if not isinstance(row, dict):
- continue
- word = str(row.get("词") or "").strip()
- item = {"词": word, "贡献度": row.get("贡献度")}
- if word in matched_map:
- item["匹配需求列表"] = matched_map[word]
- matched_word_rows.append(item)
- output_words.append(item)
- matched_points = self.filter_matched_points(
- contribution_points.get("点列表"),
- set(matched_map.keys()),
- )
- return {
- "channelContentId": channel_content_id,
- "demand_cache_run_id": demand_cache_run_id,
- "高贡献词列表": output_words,
- "匹配到需求的词列表": matched_word_rows,
- "点列表": matched_points,
- }
- def sanitize_match_result(
- self,
- match_result: dict[str, Any],
- *,
- demand_name_set: list[str],
- demand_cache_run_id: int,
- ) -> dict[str, Any]:
- demand_lookup = _build_demand_lookup(demand_name_set)
- words_rows = match_result.get("高贡献词列表") or []
- if not isinstance(words_rows, list):
- words_rows = []
- output_words: list[dict[str, Any]] = []
- matched_word_rows: list[dict[str, Any]] = []
- valid_words: set[str] = set()
- for row in words_rows:
- if not isinstance(row, dict):
- continue
- word = str(row.get("词") or "").strip()
- item = {"词": word, "贡献度": row.get("贡献度")}
- match_rows = row.get("匹配需求列表") or []
- if not isinstance(match_rows, list):
- match_rows = []
- valid_match_rows: list[dict[str, str]] = []
- for match in match_rows:
- if not isinstance(match, dict):
- continue
- demand_name = str(match.get("demand_name") or "").strip()
- canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
- if canonical_demand_name is None:
- continue
- valid_match_rows.append(
- {
- "demand_name": canonical_demand_name,
- "reason": str(match.get("reason") or "").strip(),
- }
- )
- if word and valid_match_rows:
- item["匹配需求列表"] = valid_match_rows
- matched_word_rows.append(item)
- valid_words.add(word)
- output_words.append(item)
- return {
- "channelContentId": str(match_result.get("channelContentId") or ""),
- "demand_cache_run_id": demand_cache_run_id,
- "高贡献词列表": output_words,
- "匹配到需求的词列表": matched_word_rows,
- "点列表": self.filter_matched_points(
- match_result.get("点列表"),
- valid_words,
- ),
- }
- def llm_match_single_article(
- self,
- *,
- channel_content_id: str,
- words: list[str],
- demand_name_set: list[str],
- ) -> dict[str, Any]:
- if not words:
- return {"source": channel_content_id, "matched": []}
- system_prompt = """
- #角色
- 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
- # 任务
- 我会提供两组数据:
- 1. 热点词列表:一组待匹配的热点词语
- 2. 需求词库:一组已有的需求词语
- 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
- 热点词要等于需求词,或者属于需求词,或者表达了相同含义。
- # 输出规则
- 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
- # 约束
- 1. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
- """
- user_payload = {
- "source": channel_content_id,
- "words": words,
- "demand_name_set": demand_name_set,
- "output_schema": {
- "source": "string",
- "matched": [
- {
- "title": "string, must be selected from words",
- "demand_name": "string, must be selected from demand_name_set",
- "reason": "string",
- }
- ],
- },
- }
- last_error: Exception | None = None
- for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=self.config.contribution_match_llm_model or None,
- temperature=0,
- max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- parsed.setdefault("source", channel_content_id)
- parsed.setdefault("matched", [])
- return parsed
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
- time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
- raise HotContentFlowError(
- f"llm match failed for channelContentId={channel_content_id}: {last_error}"
- ) from last_error
- def build_wxindex_trend(
- self,
- record: dict[str, Any],
- match_result: dict[str, Any],
- ) -> dict[str, Any] | None:
- matched_word_rows = match_result.get("匹配到需求的词列表") or []
- if not isinstance(matched_word_rows, list) or not matched_word_rows:
- return None
- candidate_words = [
- str(row.get("词") or "").strip()
- for row in matched_word_rows
- if isinstance(row, dict) and str(row.get("词") or "").strip()
- ]
- if not candidate_words:
- return None
- channel_content_id = str(
- match_result.get("channelContentId") or record.get("unique_key") or ""
- )
- article_title, body_text = self.extract_article_text(record)
- if len(candidate_words) == 1:
- pick = {
- "selected_word": candidate_words[0],
- "reason": "only one candidate word",
- }
- else:
- pick = self.llm_pick_best_word(
- channel_content_id=channel_content_id,
- article_title=article_title,
- body_text=body_text,
- candidate_words=candidate_words,
- )
- selected_word = pick["selected_word"]
- start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
- wx_payload = {
- "keyword": selected_word,
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- }
- wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
- series = _parse_total_scores(wx_resp)
- latest_score = series[-1]["total_score"] if series else None
- threshold = float(self.config.wxindex_score_threshold)
- return {
- "channelContentId": channel_content_id,
- "article_title": article_title,
- "llm_selected_word": selected_word,
- "llm_reason": pick["reason"],
- "wxindex": {
- "keyword": selected_word,
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- "total_score_7d": series,
- "latest_total_score": latest_score,
- "threshold": threshold,
- "latest_gt_threshold": (
- False
- if latest_score is None
- else latest_score >= threshold
- ),
- "trend": calc_wxindex_trend(series),
- },
- }
- def llm_pick_best_word(
- self,
- *,
- channel_content_id: str,
- article_title: str,
- body_text: str,
- candidate_words: list[str],
- ) -> dict[str, str]:
- system_prompt = """
- #角色
- 你是一个专业的语义分析专家,擅长精准概括整篇文章。
- # 任务
- 我会提供一篇文章的标题、正文和候选词列表,请你选择一个最能代表文章内容的词。
- # 输出规则
- 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
- """
- user_payload = {
- "source": channel_content_id,
- "article_title": article_title,
- "article_body_text": body_text,
- "candidate_words": candidate_words,
- "output_schema": {
- "source": "string",
- "selected_word": "string, must be selected from candidate_words",
- "reason": "string",
- },
- "constraints": [
- "selected_word 必须来自 candidate_words",
- "reason 简洁说明,不超过40字",
- "仅输出 JSON 对象,不要 markdown 代码块",
- ],
- }
- last_error: Exception | None = None
- for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
- try:
- resp = create_chat_completion(
- [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": json.dumps(user_payload, ensure_ascii=False),
- },
- ],
- model=self.config.wxindex_llm_model or None,
- temperature=0,
- max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
- )
- parsed = _extract_json_object(str(resp.get("content") or ""))
- selected_word = str(parsed.get("selected_word") or "").strip()
- reason = str(parsed.get("reason") or "").strip()
- if selected_word not in candidate_words:
- raise WxindexSelectionSkipped(
- f"selected_word not in candidates for {channel_content_id}: "
- f"{selected_word}"
- )
- return {"selected_word": selected_word, "reason": reason}
- except (OpenRouterCallError, HotContentFlowError) as exc:
- last_error = exc
- if attempt < max(self.config.wxindex_llm_max_attempts, 1):
- continue
- raise HotContentFlowError(
- f"llm pick word failed for {channel_content_id}: {last_error}"
- ) from last_error
- @staticmethod
- def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
- if not matched_words or not isinstance(raw_points, list):
- return []
- matched_points: list[dict[str, Any]] = []
- for point in raw_points:
- if not isinstance(point, dict):
- continue
- point_match_rows = point.get("匹配词列表") or []
- if not isinstance(point_match_rows, list):
- point_match_rows = []
- keep_rows = [
- row
- for row in point_match_rows
- if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
- ]
- if keep_rows:
- matched_points.append(
- {
- "来源": str(point.get("来源") or ""),
- "点": str(point.get("点") or ""),
- "点描述": str(point.get("点描述") or ""),
- "匹配词列表": keep_rows,
- }
- )
- return matched_points
- @staticmethod
- def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
- decode_result = record.get("decode_result_json")
- target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
- if not isinstance(target_post, dict):
- target_post = {}
- article_title = str(
- target_post.get("title") or record.get("article_title") or ""
- ).strip()
- body_text = str(
- target_post.get("body_text") or record.get("article_body") or ""
- ).strip()
- return article_title, body_text
- def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
- flow_config = config or load_flow_config()
- repository = HotContentRepository(flow_config.mysql)
- try:
- api_client = JsonApiClient(
- timeout_seconds=flow_config.request_timeout_seconds,
- verify_ssl=flow_config.https_verify_ssl,
- )
- service = ContributionPostprocessService(flow_config, repository, api_client)
- return service.run()
- finally:
- repository.close()
|