| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- """
- 将推荐结果写入 MySQL(优质作者表 + 内容表)。
- 约定:
- - 输入参数:trace_id(字符串)
- - 数据来源:.cache/traces/{trace_id}/recommendations.json
- - 表结构:good_authors, contents(字段见下面 SQL 注释)
- """
- import json
- import logging
- import os
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import pymysql
- from agent.tools import tool, ToolResult
- logger = logging.getLogger(__name__)
- def _get_connection():
- host = os.getenv("DB_HOST", "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com")
- port = int(os.getenv("DB_PORT", "3306"))
- user = os.getenv("DB_USER", "content_rw")
- password = os.getenv("DB_PASSWORD", "bC1aH4bA1lB0")
- database = os.getenv("DB_NAME", "content-deconstruction-supply")
- return pymysql.connect(
- host=host,
- port=port,
- user=user,
- password=password,
- database=database,
- charset="utf8mb4",
- cursorclass=pymysql.cursors.DictCursor,
- autocommit=True,
- )
- def _load_recommendations(trace_id: str) -> Dict[str, Any]:
- """
- 按约定路径读取推荐结果:
- - 优先:{TRACE_DIR}/{trace_id}/output.json (与 output_schema.md 保持一致)
- - 兼容:{TRACE_DIR}/{trace_id}/recommendations.json
- """
- trace_root = Path(os.getenv("TRACE_DIR", ".cache/traces"))
- base = trace_root / trace_id
- candidates = [
- base / "output.json",
- base / "recommendations.json",
- ]
- for path in candidates:
- if path.exists():
- with path.open("r", encoding="utf-8") as f:
- return json.load(f)
- raise FileNotFoundError(
- f"no recommendations JSON found for trace_id={trace_id}, tried: "
- + ", ".join(str(p) for p in candidates)
- )
- def _upsert_good_authors(
- conn,
- trace_id: str,
- good_account_block: Optional[Dict[str, Any]],
- ) -> int:
- """
- 将 good_account_expansion 中的 accounts 写入 good_authors 表。
- 约定表结构示例:
- CREATE TABLE demand_find_author (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- trace_id VARCHAR(64) NOT NULL,
- author_name VARCHAR(255) NOT NULL,
- author_link VARCHAR(512) NOT NULL,
- reason TEXT,
- expanded_count INT DEFAULT 0,
- PRIMARY KEY (id),
- KEY idx_demand_find_author_trace (trace_id),
- UNIQUE KEY uk_demand_find_author_trace_author (trace_id, author_link)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- """
- if not good_account_block:
- return 0
- if not good_account_block.get("found"):
- return 0
- accounts: List[Dict[str, Any]] = good_account_block.get("accounts") or []
- if not accounts:
- return 0
- sql = """
- INSERT INTO demand_find_author (trace_id, author_name, author_link, reason, expanded_count)
- VALUES (%s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- reason = VALUES(reason),
- expanded_count = VALUES(expanded_count)
- """
- with conn.cursor() as cur:
- rows = 0
- for acc in accounts:
- author_name = acc.get("account_name") or acc.get("author_name") or ""
- author_link = acc.get("author_link") or ""
- if not author_name or not author_link:
- # 如果只给了 sec_uid,可以由上层补 author_link
- sec_uid = acc.get("sec_uid")
- if sec_uid and not author_link:
- author_link = f"https://www.douyin.com/user/{sec_uid}"
- if not author_name or not author_link:
- continue
- reason = acc.get("reason") or ""
- expanded_count = int(acc.get("expanded_count") or 0)
- cur.execute(sql, (trace_id, author_name, author_link, reason, expanded_count))
- rows += cur.rowcount
- return rows
- def _insert_contents(
- conn,
- trace_id: str,
- contents: List[Dict[str, Any]],
- ) -> int:
- """
- 将 contents 列表写入 contents 表。
- 约定表结构示例:
- CREATE TABLE demand_find_content_result (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- trace_id VARCHAR(64) NOT NULL,
- rank INT NOT NULL,
- content_link VARCHAR(512) NOT NULL,
- title TEXT NOT NULL,
- author_name VARCHAR(255) NOT NULL,
- author_link VARCHAR(512) NOT NULL,
- digg_count BIGINT DEFAULT 0,
- comment_count BIGINT DEFAULT 0,
- share_count BIGINT DEFAULT 0,
- portrait_source VARCHAR(255),
- elderly_ratio VARCHAR(255),
- elderly_tgi VARCHAR(255),
- recommendation_reason TEXT,
- PRIMARY KEY (id),
- KEY idx_demand_find_content_trace (trace_id),
- KEY idx_demand_find_content_author (author_link)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- """
- if not contents:
- return 0
- sql = """
- INSERT INTO demand_find_content_result (
- trace_id, rank, content_link, title, author_name, author_link,
- digg_count, comment_count, share_count,
- portrait_source, elderly_ratio, elderly_tgi, recommendation_reason
- ) VALUES (
- %s, %s, %s, %s, %s, %s,
- %s, %s, %s,
- %s, %s, %s, %s
- )
- """
- with conn.cursor() as cur:
- rows = 0
- for item in contents:
- cur.execute(
- sql,
- (
- trace_id,
- int(item.get("rank") or 0),
- item.get("content_link") or "",
- item.get("title") or "",
- item.get("author_name") or "",
- item.get("author_link") or "",
- int(item.get("heat_metrics", {}).get("digg_count") or 0),
- int(item.get("heat_metrics", {}).get("comment_count") or 0),
- int(item.get("heat_metrics", {}).get("share_count") or 0),
- item.get("portrait_source") or "",
- str(item.get("elderly_ratio") or ""),
- str(item.get("elderly_tgi") or ""),
- item.get("recommendation_reason") or "",
- ),
- )
- rows += cur.rowcount
- return rows
- @tool(description="将推荐结果写入 MySQL(good_authors + contents)")
- async def store_results_mysql(trace_id: str) -> ToolResult:
- """
- 根据 trace_id 读取对应的 recommendations.json,并写入 MySQL 的两个表:
- - demand_find_author:优质账号信息
- - demand_find_content_result:推荐内容列表
- """
- try:
- data = _load_recommendations(trace_id)
- except Exception as e:
- msg = f"加载 recommendations.json 失败: {e}"
- logger.error(msg)
- return ToolResult(output=msg, metadata={"ok": False, "error": str(e)})
- conn = None
- try:
- conn = _get_connection()
- good_block = data.get("good_account_expansion") or data.get("good_accounts")
- contents = data.get("contents") or []
- authors_rows = _upsert_good_authors(conn, trace_id, good_block)
- contents_rows = _insert_contents(conn, trace_id, contents)
- output = (
- f"MySQL 写入完成:demand_find_author 影响行数={authors_rows}, "
- f"demand_find_content_result 插入条数={contents_rows}"
- )
- logger.info(output)
- return ToolResult(
- output=output,
- metadata={
- "ok": True,
- "trace_id": trace_id,
- "good_authors_affected": authors_rows,
- "contents_inserted": contents_rows,
- },
- )
- except Exception as e:
- msg = f"写入 MySQL 失败: {e}"
- logger.error(msg, exc_info=True)
- return ToolResult(output=msg, metadata={"ok": False, "error": str(e)})
- finally:
- if conn is not None:
- conn.close()
|