store_results.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. """
  2. 推荐结果写入(demand_find_author、demand_find_content_result 表)
  3. """
  4. import json
  5. from typing import Any, Dict, List, Optional
  6. from .connection import get_connection
  7. def _normalize_content_tags(value: Any) -> str:
  8. if value is None:
  9. return ""
  10. if isinstance(value, str):
  11. return value
  12. if isinstance(value, (list, tuple, set)):
  13. parts = [str(x).strip() for x in value if str(x).strip()]
  14. return ",".join(parts)
  15. if isinstance(value, dict):
  16. return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
  17. return str(value)
  18. def upsert_good_authors(
  19. conn,
  20. trace_id: str,
  21. good_account_block: Optional[Dict[str, Any]],
  22. ) -> int:
  23. """
  24. 将 good_account_expansion 中的 accounts 写入 demand_find_author 表。
  25. 兼容两种格式:
  26. - 标准格式:{"enabled": true, "accounts": [...]}
  27. - 降级格式:直接传 list(agent 未严格遵守 schema 时的兜底)
  28. """
  29. if not good_account_block:
  30. return 0
  31. if isinstance(good_account_block, list):
  32. accounts: List[Dict[str, Any]] = good_account_block
  33. else:
  34. if not good_account_block.get("enabled"):
  35. return 0
  36. accounts = good_account_block.get("accounts") or []
  37. if not accounts:
  38. return 0
  39. sql = """
  40. INSERT INTO demand_find_author (trace_id, author_name, author_link, elderly_ratio, elderly_tgi, remark, content_tags)
  41. VALUES (%s, %s, %s, %s, %s, %s, %s)
  42. ON DUPLICATE KEY UPDATE
  43. elderly_ratio = VALUES(elderly_ratio),
  44. elderly_tgi = VALUES(elderly_tgi),
  45. remark = VALUES(remark)
  46. """
  47. with conn.cursor() as cur:
  48. rows = 0
  49. for acc in accounts:
  50. # 与 output_schema 一致:author_nickname / author_sec_uid / author_url
  51. # 兼容 Agent 常用别名:account_name、sec_uid(见 good_account_expansion 数组简写)
  52. author_name = (
  53. acc.get("author_nickname")
  54. or acc.get("account_name")
  55. or ""
  56. )
  57. author_link = acc.get("author_url") or ""
  58. sec_uid = acc.get("author_sec_uid") or acc.get("sec_uid")
  59. if not author_link and sec_uid:
  60. author_link = f"https://www.douyin.com/user/{sec_uid}"
  61. if not author_name or not author_link:
  62. continue
  63. elderly_ratio = acc.get("age_50_plus_ratio") or ""
  64. elderly_tgi = acc.get("age_50_plus_tgi") or ""
  65. remark = acc.get("reason") or acc.get("remark") or ""
  66. content_tags = _normalize_content_tags(acc.get("content_tags"))
  67. cur.execute(
  68. sql,
  69. (
  70. trace_id,
  71. author_name,
  72. author_link,
  73. str(elderly_ratio) if elderly_ratio is not None else None,
  74. str(elderly_tgi) if elderly_tgi is not None else None,
  75. remark or None,
  76. content_tags or None,
  77. ),
  78. )
  79. rows += cur.rowcount
  80. return rows
  81. def fetch_demand_content_dt(conn, demand_content_id: int) -> Optional[Any]:
  82. """按 demand_content.id 查询 dt(与 schedule 约定一致,多为 YYYYMMDD 整数)。"""
  83. sql = "SELECT dt FROM demand_content WHERE id = %s LIMIT 1"
  84. with conn.cursor() as cur:
  85. cur.execute(sql, (demand_content_id,))
  86. row = cur.fetchone()
  87. if not row:
  88. return None
  89. return row.get("dt")
  90. def insert_contents(
  91. conn,
  92. trace_id: str,
  93. query: str,
  94. demand_content_id: int,
  95. contents: List[Dict[str, Any]],
  96. dt: Optional[Any] = None,
  97. ) -> int:
  98. """
  99. 将 contents 列表写入 demand_find_content_result 表。
  100. dt 来自 demand_content.dt,与 demand_content_id 对应;未查到时可传 None。
  101. """
  102. if not contents:
  103. return 0
  104. sql = """
  105. INSERT INTO demand_find_content_result (
  106. trace_id, query, rank_no, aweme_id, video_url, title, author_name, author_id, author_link,
  107. digg_count, comment_count, share_count,
  108. portrait_source, elderly_ratio, elderly_tgi, recommendation_reason,
  109. demand_content_id, dt
  110. ) VALUES (
  111. %s, %s, %s, %s, %s, %s, %s, %s, %s,
  112. %s, %s, %s,
  113. %s, %s, %s, %s,
  114. %s, %s
  115. )
  116. """
  117. with conn.cursor() as cur:
  118. rows = 0
  119. for item in contents:
  120. video_url = item.get("video_url") or ""
  121. stats = item.get("statistics") or {}
  122. portrait = item.get("portrait_data") or {}
  123. # age_distribution 是 agent 有时输出的非标准结构,兜底提取 50+ 占比
  124. age_dist = portrait.get("age_distribution") or {}
  125. age_50_plus_ratio = portrait.get("age_50_plus_ratio") or age_dist.get("50+") or ""
  126. age_50_plus_tgi = portrait.get("age_50_plus_tgi") or ""
  127. cur.execute(
  128. sql,
  129. (
  130. trace_id,
  131. query,
  132. int(item.get("rank") or item.get("rank_no") or 0),
  133. item.get("aweme_id") or "",
  134. video_url,
  135. item.get("title") or "",
  136. item.get("author_nickname") or "",
  137. item.get("author_sec_uid") or "",
  138. item.get("author_url") or "",
  139. # like_count 是 agent 有时输出的非标准字段名,兜底处理
  140. int(stats.get("digg_count") or stats.get("like_count") or 0),
  141. int(stats.get("comment_count") or 0),
  142. int(stats.get("share_count") or 0),
  143. portrait.get("source") or "",
  144. str(age_50_plus_ratio) if age_50_plus_ratio != "" else "",
  145. str(age_50_plus_tgi) if age_50_plus_tgi != "" else "",
  146. item.get("reason") or "",
  147. demand_content_id,
  148. dt,
  149. ),
  150. )
  151. rows += cur.rowcount
  152. return rows
  153. def update_content_plan_ids(
  154. trace_id: str,
  155. aweme_ids: List[str],
  156. crawler_plan_id: str = "",
  157. produce_plan_id: str = "",
  158. ) -> int:
  159. """
  160. 更新 demand_find_content_result 中指定内容的计划字段。
  161. 约定:
  162. - 通过 (trace_id, aweme_id) 定位内容行
  163. - crawler_plan_id / produce_plan_id 可只传其一:仅更新非空字段
  164. - 至少一个计划 id 非空时才执行 UPDATE
  165. - 内部自行获取并关闭数据库连接
  166. """
  167. if not aweme_ids or not isinstance(aweme_ids, list):
  168. return 0
  169. c = (crawler_plan_id or "").strip()
  170. p = (produce_plan_id or "").strip()
  171. if not c and not p:
  172. return 0
  173. set_parts: List[str] = []
  174. params: List[Any] = []
  175. if c:
  176. set_parts.append("crawler_plan_id = %s")
  177. params.append(c)
  178. if p:
  179. set_parts.append("produce_plan_id = %s")
  180. params.append(p)
  181. sql = f"""
  182. UPDATE demand_find_content_result
  183. SET {", ".join(set_parts)}
  184. WHERE trace_id = %s AND aweme_id = %s
  185. """
  186. conn = get_connection()
  187. try:
  188. rows = 0
  189. with conn.cursor() as cur:
  190. for aweme_id in aweme_ids:
  191. cur.execute(sql, (*params, trace_id, aweme_id))
  192. rows += cur.rowcount
  193. return rows
  194. finally:
  195. conn.close()
  196. def update_web_html_url(trace_id: str, web_html_url: str) -> int:
  197. """
  198. 根据 trace_id 回写 demand_find_content_result.web_html_url。
  199. 约定:
  200. - trace_id 为 output 子目录名
  201. - web_html_url 为 OSS 公网 URL
  202. - 同一 trace_id 可能对应多条内容,统一更新
  203. """
  204. t = (trace_id or "").strip()
  205. url = (web_html_url or "").strip()
  206. if not t or not url:
  207. return 0
  208. sql = """
  209. UPDATE demand_find_content_result
  210. SET web_html_url = %s
  211. WHERE trace_id = %s
  212. """
  213. conn = get_connection()
  214. try:
  215. with conn.cursor() as cur:
  216. cur.execute(sql, (url, t))
  217. return cur.rowcount
  218. finally:
  219. conn.close()
  220. def update_process_trace_by_aweme_id(
  221. *,
  222. trace_id: str,
  223. aweme_id: str,
  224. process_trace_text: str,
  225. channel: str = "抖音",
  226. ) -> int:
  227. """
  228. 根据 (trace_id, aweme_id) 回写 demand_find_content_result.process_trace(TEXT)与 channel。
  229. 约定:
  230. - trace_id 为 output 子目录名
  231. - aweme_id 为内容唯一 id(表中 demand_find_content_result.aweme_id)
  232. - process_trace_text 为 JSON 序列化后的字符串(或原始文本)
  233. - channel 默认「抖音」;当前业务仅抖音搜索场景,后续可按行区分时再传入
  234. """
  235. t = (trace_id or "").strip()
  236. a = (aweme_id or "").strip()
  237. text = (process_trace_text or "").strip()
  238. ch = (channel or "").strip()
  239. if not t or not a or not text:
  240. return 0
  241. if not ch:
  242. ch = "抖音"
  243. sql = """
  244. UPDATE demand_find_content_result
  245. SET process_trace = %s,
  246. channel = %s
  247. WHERE trace_id = %s AND aweme_id = %s
  248. """
  249. conn = get_connection()
  250. try:
  251. with conn.cursor() as cur:
  252. cur.execute(sql, (text, ch, t, a))
  253. return cur.rowcount
  254. finally:
  255. conn.close()