| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- """热点内容流程服务。"""
- from __future__ import annotations
- from datetime import datetime
- import json
- import time
- from typing import Any
- from app.hot_content.category_filter import llm_filter_record_content
- from app.hot_content.client import (
- JsonApiClient,
- build_url,
- extract_decode_item_map,
- extract_keyword_items,
- extract_rank_items,
- pick_first_valid_content,
- render_template,
- )
- from app.hot_content.config import load_flow_config
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository, unique_title_key
- from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0}
- KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0}
- class HotContentFlowService:
- def __init__(
- self,
- config: FlowConfig,
- repository: HotContentRepository,
- api_client: JsonApiClient,
- ):
- self.config = config
- self.repository = repository
- self.api_client = api_client
- def run(self) -> dict[str, Any]:
- hot_titles = self.fetch_and_save_hot_titles()
- selected_contents = self.search_and_save_contents(hot_titles)
- selected_contents = self.filter_contents_by_category(selected_contents)
- decode_resp = self.submit_decode_tasks(selected_contents)
- return self.build_summary(hot_titles, selected_contents, decode_resp)
- def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]:
- saved_titles: list[dict[str, Any]] = []
- seen_keys: set[str] = set()
- response_cache: dict[str, dict[str, Any]] = {}
- for source_config in self.config.sources:
- hot_rank_base_url = (
- source_config.hot_rank_base_url or self.config.crawapi_base_url
- )
- hot_rank_path = source_config.hot_rank_path or self.config.hot_rank_path
- hot_rank_payload = (
- source_config.hot_rank_payload
- if source_config.hot_rank_payload is not None
- else HOT_RANK_PAYLOAD
- )
- cache_key = json.dumps(
- {
- "base_url": hot_rank_base_url,
- "path": hot_rank_path,
- "payload": hot_rank_payload,
- },
- ensure_ascii=False,
- sort_keys=True,
- )
- if cache_key not in response_cache:
- hot_url = build_url(hot_rank_base_url, hot_rank_path)
- response_cache[cache_key] = self.api_client.post_json(
- hot_url,
- hot_rank_payload,
- )
- resp = response_cache[cache_key]
- rank_items = extract_rank_items(resp, source_config.source)
- for rank_item in rank_items:
- title = str(rank_item.get("title") or rank_item.get("name") or "").strip()
- if not title:
- continue
- unique_key = unique_title_key(source_config.source, title)
- if unique_key in seen_keys:
- continue
- seen_keys.add(unique_key)
- record = self.repository.upsert_record(
- source=source_config.source,
- title=title,
- rank=_to_int_or_none(rank_item.get("rank")),
- )
- saved_titles.append(
- {
- "id": record["id"],
- "unique_key": record["unique_key"],
- "execution_status": record["execution_status"],
- "article_title": record["article_title"],
- "article_body": record["article_body"],
- "article_url": record["article_url"],
- "has_decode_request": record["has_decode_request"],
- "has_contribution_points": record["has_contribution_points"],
- "source": source_config.source,
- "title": title,
- "rank": _to_int_or_none(rank_item.get("rank")),
- "source_config": source_config,
- }
- )
- return saved_titles
- def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]:
- selected_contents: list[dict[str, Any]] = []
- for hot in hot_titles:
- execution_status = int(hot.get("execution_status") or 0)
- if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get(
- "has_contribution_points"
- ):
- continue
- if execution_status == ExecutionStatus.CATEGORY_FILTER_REJECTED:
- continue
- existing_body = str(hot.get("article_body") or "").strip()
- existing_title = str(hot.get("article_title") or "").strip()
- if (
- execution_status in {
- ExecutionStatus.CONTENT_OK,
- ExecutionStatus.CATEGORY_FILTER_PASSED,
- }
- and existing_title
- and existing_body
- ):
- selected_contents.append(
- {
- "record_id": hot["id"],
- "unique_key": hot["unique_key"],
- "hot_title": hot["title"],
- "status": "ok",
- "content_title": existing_title,
- "body_text": existing_body,
- "url": str(hot.get("article_url") or ""),
- "category_filter_passed": (
- execution_status == ExecutionStatus.CATEGORY_FILTER_PASSED
- ),
- }
- )
- continue
- keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path)
- payload = render_template(
- KEYWORD_PAYLOAD_TEMPLATE,
- {
- "title": hot["title"],
- "source": hot["source"],
- "record_id": str(hot["id"]),
- "unique_key": str(hot["unique_key"]),
- "hot_title_id": str(hot["id"]),
- },
- )
- for attempt in range(1, 4): # max 3 retries
- try:
- resp = self.api_client.post_json(keyword_url, payload)
- selected = pick_first_valid_content(extract_keyword_items(resp))
- if selected is None:
- self.repository.mark_no_valid_content(
- record_id=hot["id"],
- reason="no valid content",
- )
- selected_contents.append(
- {
- "record_id": hot["id"],
- "unique_key": hot["unique_key"],
- "hot_title": hot["title"],
- "status": "no_valid_content",
- }
- )
- break
- self.repository.update_article(
- record_id=hot["id"],
- article_title=selected["content_title"],
- article_body=selected["body_text"],
- url=selected["url"],
- )
- selected_contents.append(
- {
- "record_id": hot["id"],
- "unique_key": hot["unique_key"],
- "hot_title": hot["title"],
- "status": "ok",
- **selected,
- }
- )
- break
- except HotContentFlowError as exc:
- if attempt < 3:
- # 简单退避,避免对不稳定接口造成瞬时压力
- time.sleep(attempt)
- continue
- self.repository.update_status(
- record_id=hot["id"],
- status=ExecutionStatus.CONTENT_REQUEST_FAILED,
- error_message=str(exc),
- )
- selected_contents.append(
- {
- "record_id": hot["id"],
- "unique_key": hot["unique_key"],
- "hot_title": hot["title"],
- "status": "content_request_failed",
- "error": str(exc),
- }
- )
- return selected_contents
- def filter_contents_by_category(
- self,
- selected_contents: list[dict[str, Any]],
- ) -> list[dict[str, Any]]:
- filtered: list[dict[str, Any]] = []
- category_list = list(self.config.category_filter_categories)
- total = sum(1 for item in selected_contents if item.get("status") == "ok")
- processed = 0
- for item in selected_contents:
- if item.get("status") != "ok":
- filtered.append(item)
- continue
- if item.get("category_filter_passed"):
- filtered.append(item)
- continue
- record_id = int(item["record_id"])
- record = self.repository.get_record_for_category_filter(record_id)
- if not record:
- self.repository.mark_no_valid_content(
- record_id=record_id,
- reason="record not found",
- )
- filtered.append(
- {
- "record_id": record_id,
- "unique_key": item.get("unique_key"),
- "status": "no_valid_content",
- "reason": "record not found",
- }
- )
- continue
- hot_title = str(record.get("title") or "").strip()
- content_title = str(record.get("article_title") or "").strip()
- body_text = str(record.get("article_body") or "").strip()
- if not hot_title or not content_title or not body_text:
- self.repository.mark_no_valid_content(
- record_id=record_id,
- reason="missing title or body",
- )
- filtered.append(
- {
- "record_id": record_id,
- "unique_key": item.get("unique_key"),
- "status": "no_valid_content",
- "reason": "missing title or body",
- }
- )
- continue
- processed += 1
- result = llm_filter_record_content(
- record_id=record_id,
- hot_title=hot_title,
- content_title=content_title,
- body_text=body_text,
- category_list=category_list,
- model=self.config.category_filter_llm_model,
- max_attempts=self.config.category_filter_llm_max_attempts,
- retry_sleep_seconds=self.config.category_filter_llm_retry_sleep_seconds,
- max_tokens=self.config.category_filter_llm_max_tokens,
- body_max_chars=self.config.category_filter_body_max_chars,
- )
- passed = bool(result.get("passed"))
- self.repository.update_category_filter_result(
- record_id=record_id,
- passed=passed,
- result_json=result,
- )
- if passed:
- filtered.append(
- {
- **item,
- "category_filter_passed": True,
- "matched_category": result.get("matched_category"),
- }
- )
- else:
- filtered.append(
- {
- "record_id": record_id,
- "unique_key": item.get("unique_key"),
- "status": "category_rejected",
- }
- )
- if (
- processed < total
- and self.config.category_filter_item_sleep_seconds > 0
- ):
- time.sleep(self.config.category_filter_item_sleep_seconds)
- return filtered
- def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]:
- decode_items: list[dict[str, Any]] = []
- request_count = 0
- failed_request_count = 0
- for item in selected_contents:
- if item.get("status") != "ok":
- continue
- record_id = int(item["record_id"])
- channel_content_id = str(item["unique_key"])
- title = str(item.get("content_title") or "").strip()
- body_text = str(item.get("body_text") or "").strip()
- if not title or not body_text:
- continue
- post = {
- "channelContentId": channel_content_id,
- "title": title,
- "bodyText": body_text,
- "contentModal": 3,
- }
- decode_payload = {
- "params": {
- "configId": self.config.decode_config_id,
- "skipCompleted": False,
- "posts": [post],
- }
- }
- request_count += 1
- try:
- decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload)
- except HotContentFlowError as exc:
- decode_resp = {"code": -1, "data": [], "msg": str(exc)}
- failed_request_count += 1
- decode_item_map = extract_decode_item_map(decode_resp)
- decode_item = decode_item_map.get(channel_content_id, {})
- status = decode_api_status_to_code(
- str(decode_item.get("status") or ""),
- has_success_response=int(decode_resp.get("code") or 0) == 0,
- )
- error_message = None
- if int(decode_resp.get("code") or 0) != 0:
- error_message = str(decode_resp.get("msg") or decode_resp)
- self.repository.update_decode_result(
- record_id=record_id,
- status=status,
- request_json=decode_payload,
- response_json=decode_item or decode_resp,
- error_message=error_message,
- )
- if decode_item:
- decode_items.append(decode_item)
- if request_count == 0:
- return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0}
- return {
- "code": 0 if failed_request_count == 0 else -1,
- "data": decode_items,
- "request_count": request_count,
- "failed_request_count": failed_request_count,
- }
- def build_summary(
- self,
- hot_titles: list[dict[str, Any]],
- selected_contents: list[dict[str, Any]],
- decode_resp: dict[str, Any],
- ) -> dict[str, Any]:
- decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else []
- decode_items = decode_items if isinstance(decode_items, list) else []
- return {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "source_count": len(self.config.sources),
- "hot_title_count": len(hot_titles),
- "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
- "selected_failed_count": sum(
- 1 for item in selected_contents if item.get("status") != "ok"
- ),
- "category_rejected_count": sum(
- 1 for item in selected_contents if item.get("status") == "category_rejected"
- ),
- "no_valid_content_count": sum(
- 1 for item in selected_contents if item.get("status") == "no_valid_content"
- ),
- "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
- "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None,
- "decode_success_count": sum(
- 1 for item in decode_items if str(item.get("status") or "") == "SUCCESS"
- ),
- "decode_pending_count": sum(
- 1 for item in decode_items if str(item.get("status") or "") == "PENDING"
- ),
- "decode_failed_count": sum(
- 1 for item in decode_items if str(item.get("status") or "") == "FAILED"
- ),
- }
- def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
- flow_config = config or load_flow_config()
- repository = HotContentRepository(flow_config.mysql)
- try:
- api_client = JsonApiClient(
- timeout_seconds=flow_config.request_timeout_seconds,
- verify_ssl=flow_config.https_verify_ssl,
- )
- service = HotContentFlowService(flow_config, repository, api_client)
- return service.run()
- finally:
- repository.close()
- def _to_int_or_none(value: Any) -> int | None:
- try:
- return int(value)
- except (TypeError, ValueError):
- return None
|