| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- """
- 推荐结果写入(demand_find_author、demand_find_content_result 表)
- """
- import json
- from typing import Any, Dict, List, Optional
- from .connection import get_connection
- def _normalize_content_tags(value: Any) -> str:
- if value is None:
- return ""
- if isinstance(value, str):
- return value
- if isinstance(value, (list, tuple, set)):
- parts = [str(x).strip() for x in value if str(x).strip()]
- return ",".join(parts)
- if isinstance(value, dict):
- return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
- return str(value)
- def upsert_good_authors(
- conn,
- trace_id: str,
- good_account_block: Optional[Dict[str, Any]],
- ) -> int:
- """
- 将 good_account_expansion 中的 accounts 写入 demand_find_author 表。
- 兼容两种格式:
- - 标准格式:{"enabled": true, "accounts": [...]}
- - 降级格式:直接传 list(agent 未严格遵守 schema 时的兜底)
- """
- if not good_account_block:
- return 0
- if isinstance(good_account_block, list):
- accounts: List[Dict[str, Any]] = good_account_block
- else:
- if not good_account_block.get("enabled"):
- return 0
- accounts = good_account_block.get("accounts") or []
- if not accounts:
- return 0
- sql = """
- INSERT INTO demand_find_author (trace_id, author_name, author_link, elderly_ratio, elderly_tgi, remark, content_tags)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- elderly_ratio = VALUES(elderly_ratio),
- elderly_tgi = VALUES(elderly_tgi),
- remark = VALUES(remark)
- """
- with conn.cursor() as cur:
- rows = 0
- for acc in accounts:
- # 与 output_schema 一致:author_nickname / author_sec_uid / author_url
- # 兼容 Agent 常用别名:account_name、sec_uid(见 good_account_expansion 数组简写)
- author_name = (
- acc.get("author_nickname")
- or acc.get("account_name")
- or ""
- )
- author_link = acc.get("author_url") or ""
- sec_uid = acc.get("author_sec_uid") or acc.get("sec_uid")
- if not author_link and sec_uid:
- author_link = f"https://www.douyin.com/user/{sec_uid}"
- if not author_name or not author_link:
- continue
- elderly_ratio = acc.get("age_50_plus_ratio") or ""
- elderly_tgi = acc.get("age_50_plus_tgi") or ""
- remark = acc.get("reason") or acc.get("remark") or ""
- content_tags = _normalize_content_tags(acc.get("content_tags"))
- cur.execute(
- sql,
- (
- trace_id,
- author_name,
- author_link,
- str(elderly_ratio) if elderly_ratio is not None else None,
- str(elderly_tgi) if elderly_tgi is not None else None,
- remark or None,
- content_tags or None,
- ),
- )
- rows += cur.rowcount
- return rows
- def fetch_demand_content_dt(conn, demand_content_id: int) -> Optional[Any]:
- """按 demand_content.id 查询 dt(与 schedule 约定一致,多为 YYYYMMDD 整数)。"""
- sql = "SELECT dt FROM demand_content WHERE id = %s LIMIT 1"
- with conn.cursor() as cur:
- cur.execute(sql, (demand_content_id,))
- row = cur.fetchone()
- if not row:
- return None
- return row.get("dt")
- def insert_contents(
- conn,
- trace_id: str,
- query: str,
- demand_content_id: int,
- contents: List[Dict[str, Any]],
- dt: Optional[Any] = None,
- ) -> int:
- """
- 将 contents 列表写入 demand_find_content_result 表。
- dt 来自 demand_content.dt,与 demand_content_id 对应;未查到时可传 None。
- """
- if not contents:
- return 0
- sql = """
- INSERT INTO demand_find_content_result (
- trace_id, query, rank_no, aweme_id, video_url, title, author_name, author_id, author_link,
- digg_count, comment_count, share_count,
- portrait_source, elderly_ratio, elderly_tgi, recommendation_reason,
- demand_content_id, dt
- ) VALUES (
- %s, %s, %s, %s, %s, %s, %s, %s, %s,
- %s, %s, %s,
- %s, %s, %s, %s,
- %s, %s
- )
- """
- with conn.cursor() as cur:
- rows = 0
- for item in contents:
- video_url = item.get("video_url") or ""
- stats = item.get("statistics") or {}
- portrait = item.get("portrait_data") or {}
- # age_distribution 是 agent 有时输出的非标准结构,兜底提取 50+ 占比
- age_dist = portrait.get("age_distribution") or {}
- age_50_plus_ratio = portrait.get("age_50_plus_ratio") or age_dist.get("50+") or ""
- age_50_plus_tgi = portrait.get("age_50_plus_tgi") or ""
- cur.execute(
- sql,
- (
- trace_id,
- query,
- int(item.get("rank") or item.get("rank_no") or 0),
- item.get("aweme_id") or "",
- video_url,
- item.get("title") or "",
- item.get("author_nickname") or "",
- item.get("author_sec_uid") or "",
- item.get("author_url") or "",
- # like_count 是 agent 有时输出的非标准字段名,兜底处理
- int(stats.get("digg_count") or stats.get("like_count") or 0),
- int(stats.get("comment_count") or 0),
- int(stats.get("share_count") or 0),
- portrait.get("source") or "",
- str(age_50_plus_ratio) if age_50_plus_ratio != "" else "",
- str(age_50_plus_tgi) if age_50_plus_tgi != "" else "",
- item.get("reason") or "",
- demand_content_id,
- dt,
- ),
- )
- rows += cur.rowcount
- return rows
- def update_content_plan_ids(
- trace_id: str,
- aweme_ids: List[str],
- crawler_plan_id: str = "",
- produce_plan_id: str = "",
- ) -> int:
- """
- 更新 demand_find_content_result 中指定内容的计划字段。
- 约定:
- - 通过 (trace_id, aweme_id) 定位内容行
- - crawler_plan_id / produce_plan_id 可只传其一:仅更新非空字段
- - 至少一个计划 id 非空时才执行 UPDATE
- - 内部自行获取并关闭数据库连接
- """
- if not aweme_ids or not isinstance(aweme_ids, list):
- return 0
- c = (crawler_plan_id or "").strip()
- p = (produce_plan_id or "").strip()
- if not c and not p:
- return 0
- set_parts: List[str] = []
- params: List[Any] = []
- if c:
- set_parts.append("crawler_plan_id = %s")
- params.append(c)
- if p:
- set_parts.append("produce_plan_id = %s")
- params.append(p)
- sql = f"""
- UPDATE demand_find_content_result
- SET {", ".join(set_parts)}
- WHERE trace_id = %s AND aweme_id = %s
- """
- conn = get_connection()
- try:
- rows = 0
- with conn.cursor() as cur:
- for aweme_id in aweme_ids:
- cur.execute(sql, (*params, trace_id, aweme_id))
- rows += cur.rowcount
- return rows
- finally:
- conn.close()
- def update_web_html_url(trace_id: str, web_html_url: str) -> int:
- """
- 根据 trace_id 回写 demand_find_content_result.web_html_url。
- 约定:
- - trace_id 为 output 子目录名
- - web_html_url 为 OSS 公网 URL
- - 同一 trace_id 可能对应多条内容,统一更新
- """
- t = (trace_id or "").strip()
- url = (web_html_url or "").strip()
- if not t or not url:
- return 0
- sql = """
- UPDATE demand_find_content_result
- SET web_html_url = %s
- WHERE trace_id = %s
- """
- conn = get_connection()
- try:
- with conn.cursor() as cur:
- cur.execute(sql, (url, t))
- return cur.rowcount
- finally:
- conn.close()
- def update_process_trace_by_aweme_id(
- *,
- trace_id: str,
- aweme_id: str,
- process_trace_text: str,
- channel: str = "抖音",
- ) -> int:
- """
- 根据 (trace_id, aweme_id) 回写 demand_find_content_result.process_trace(TEXT)与 channel。
- 约定:
- - trace_id 为 output 子目录名
- - aweme_id 为内容唯一 id(表中 demand_find_content_result.aweme_id)
- - process_trace_text 为 JSON 序列化后的字符串(或原始文本)
- - channel 默认「抖音」;当前业务仅抖音搜索场景,后续可按行区分时再传入
- """
- t = (trace_id or "").strip()
- a = (aweme_id or "").strip()
- text = (process_trace_text or "").strip()
- ch = (channel or "").strip()
- if not t or not a or not text:
- return 0
- if not ch:
- ch = "抖音"
- sql = """
- UPDATE demand_find_content_result
- SET process_trace = %s,
- channel = %s
- WHERE trace_id = %s AND aweme_id = %s
- """
- conn = get_connection()
- try:
- with conn.cursor() as cur:
- cur.execute(sql, (text, ch, t, a))
- return cur.rowcount
- finally:
- conn.close()
|