find_authors_from_db.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. """
  2. 从数据库中按“搜索词 query”检索历史任务沉淀的优质作者(demand_find_author)。
  3. 用途:
  4. - 先用该工具找到相关作者(sec_uid / 链接)
  5. - 再调用 douyin_user_videos(account_id=sec_uid) 获取其作品做二次筛选
  6. """
  7. from __future__ import annotations
  8. import re
  9. from typing import Any, Dict, List, Optional
  10. from agent.tools import ToolResult, tool
  11. from db import get_connection
  12. _DOUYIN_USER_URL_RE = re.compile(r"^https?://www\.douyin\.com/user/(?P<sec_uid>[^/?#]+)")
  13. def _extract_sec_uid(author_link: str) -> str:
  14. if not author_link:
  15. return ""
  16. m = _DOUYIN_USER_URL_RE.match(author_link.strip())
  17. return m.group("sec_uid") if m else ""
  18. def _query_authors(conn, query: str, limit: int) -> List[Dict[str, Any]]:
  19. q = (query or "").strip()
  20. if not q:
  21. return []
  22. # demand_find_author 本身不存 query,需要通过 trace_id 关联 demand_find_content_result.query
  23. sql = """
  24. SELECT DISTINCT
  25. a.author_name,
  26. a.author_link,
  27. a.elderly_ratio,
  28. a.elderly_tgi,
  29. a.remark,
  30. a.trace_id
  31. FROM demand_find_author a
  32. INNER JOIN demand_find_content_result r
  33. ON r.trace_id = a.trace_id
  34. WHERE r.query LIKE %s
  35. ORDER BY a.elderly_ratio DESC, a.elderly_tgi DESC
  36. LIMIT %s
  37. """
  38. like = f"%{q}%"
  39. with conn.cursor() as cur:
  40. cur.execute(sql, (like, int(limit)))
  41. rows = cur.fetchall() or []
  42. return [dict(r) for r in rows]
  43. @tool(description="从 demand_find_author 中按搜索词查找相关作者")
  44. async def find_authors_from_db(query: str, limit: int = 20) -> ToolResult:
  45. """
  46. Args:
  47. query: 搜索词(与历史 demand_find_content_result.query 模糊匹配)
  48. limit: 返回作者数量上限
  49. """
  50. conn = get_connection()
  51. try:
  52. rows = _query_authors(conn, query=query, limit=limit)
  53. finally:
  54. conn.close()
  55. authors: List[Dict[str, Any]] = []
  56. for r in rows:
  57. author_link = r.get("author_link") or ""
  58. authors.append(
  59. {
  60. "author_nickname": r.get("author_name") or "",
  61. "author_url": author_link,
  62. "author_sec_uid": _extract_sec_uid(author_link),
  63. "age_50_plus_ratio": r.get("elderly_ratio") or "",
  64. "age_50_plus_tgi": r.get("elderly_tgi") or "",
  65. "remark": r.get("remark") or "",
  66. "trace_id": r.get("trace_id") or "",
  67. }
  68. )
  69. lines = [f"按搜索词「{query}」在数据库中找到 {len(authors)} 个相关作者:", ""]
  70. for i, a in enumerate(authors, 1):
  71. lines.append(f"{i}. {a['author_nickname']}")
  72. if a["author_sec_uid"]:
  73. lines.append(f" sec_uid: {a['author_sec_uid']}")
  74. if a["author_url"]:
  75. lines.append(f" 链接: {a['author_url']}")
  76. if a["age_50_plus_ratio"] != "" or a["age_50_plus_tgi"] != "":
  77. lines.append(f" 画像: 50+占比={a['age_50_plus_ratio']} | TGI={a['age_50_plus_tgi']}")
  78. if a["remark"]:
  79. lines.append(f" 备注: {a['remark']}")
  80. lines.append("")
  81. return ToolResult(
  82. title="数据库作者检索",
  83. output="\n".join(lines).strip(),
  84. metadata={"authors": authors, "query": query, "limit": limit},
  85. long_term_memory=f"DB author search for '{query}', found {len(authors)} authors",
  86. )