find_authors_from_db.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. """
  2. 从数据库中按“搜索词 query”检索历史任务沉淀的优质作者(demand_find_author)。
  3. 用途:
  4. - 先用该工具找到相关作者(sec_uid / 链接)
  5. - 再调用 douyin_user_videos(account_id=sec_uid) 获取其作品做二次筛选
  6. """
  7. from __future__ import annotations
  8. import json
  9. import re
  10. from typing import Any, Dict, List, Optional
  11. from agent.tools import ToolResult, tool
  12. from utils.tool_logging import format_tool_result_for_log, log_tool_call
  13. from db import get_connection
  14. _LOG_LABEL = "工具调用:find_authors_from_db -> 数据库按搜索词检索优质作者"
  15. _DOUYIN_USER_URL_RE = re.compile(r"^https?://www\.douyin\.com/user/(?P<sec_uid>[^/?#]+)")
  16. def _extract_sec_uid(author_link: str) -> str:
  17. if not author_link:
  18. return ""
  19. m = _DOUYIN_USER_URL_RE.match(author_link.strip())
  20. return m.group("sec_uid") if m else ""
  21. def _query_authors(conn, query: str, limit: int) -> List[Dict[str, Any]]:
  22. q = (query or "").strip()
  23. if not q:
  24. return []
  25. # demand_find_author 本身不存 query,需要通过 trace_id 关联 demand_find_content_result.query
  26. sql = """
  27. SELECT DISTINCT
  28. a.author_name,
  29. a.author_link,
  30. a.elderly_ratio,
  31. a.elderly_tgi,
  32. a.remark,
  33. a.trace_id
  34. FROM demand_find_author a
  35. INNER JOIN demand_find_content_result r
  36. ON r.trace_id = a.trace_id
  37. WHERE r.query LIKE %s
  38. ORDER BY a.elderly_ratio DESC, a.elderly_tgi DESC
  39. LIMIT %s
  40. """
  41. like = f"%{q}%"
  42. with conn.cursor() as cur:
  43. cur.execute(sql, (like, int(limit)))
  44. rows = cur.fetchall() or []
  45. return [dict(r) for r in rows]
  46. @tool(description="从 demand_find_author 中按搜索词查找相关作者")
  47. async def find_authors_from_db(query: str, limit: int = 20) -> ToolResult:
  48. """
  49. Args:
  50. query: 搜索词(与历史 demand_find_content_result.query 模糊匹配)
  51. limit: 返回作者数量上限
  52. """
  53. call_params = {"query": query, "limit": limit}
  54. conn = get_connection()
  55. try:
  56. rows = _query_authors(conn, query=query, limit=limit)
  57. finally:
  58. conn.close()
  59. authors: List[Dict[str, Any]] = []
  60. for r in rows:
  61. author_link = r.get("author_link") or ""
  62. authors.append(
  63. {
  64. "author_nickname": r.get("author_name") or "",
  65. "author_url": author_link,
  66. "author_sec_uid": _extract_sec_uid(author_link),
  67. "age_50_plus_ratio": r.get("elderly_ratio") or "",
  68. "age_50_plus_tgi": r.get("elderly_tgi") or "",
  69. "remark": r.get("remark") or "",
  70. "trace_id": r.get("trace_id") or "",
  71. }
  72. )
  73. lines = [f"按搜索词「{query}」在数据库中找到 {len(authors)} 个相关作者:", ""]
  74. for i, a in enumerate(authors, 1):
  75. lines.append(f"{i}. {a['author_nickname']}")
  76. if a["author_sec_uid"]:
  77. lines.append(f" sec_uid: {a['author_sec_uid']}")
  78. if a["author_url"]:
  79. lines.append(f" 链接: {a['author_url']}")
  80. if a["age_50_plus_ratio"] != "" or a["age_50_plus_tgi"] != "":
  81. lines.append(f" 画像: 50+占比={a['age_50_plus_ratio']} | TGI={a['age_50_plus_tgi']}")
  82. if a["remark"]:
  83. lines.append(f" 备注: {a['remark']}")
  84. lines.append("")
  85. out = ToolResult(
  86. title="数据库作者检索",
  87. output="\n".join(lines).strip(),
  88. metadata={"authors": authors, "query": query, "limit": limit},
  89. long_term_memory=f"DB author search for '{query}', found {len(authors)} authors",
  90. )
  91. log_tool_call(_LOG_LABEL, call_params, json.dumps(out.metadata.get("authors", []), ensure_ascii=False))
  92. return out