weixin_index_search.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. """
  2. 微信指数搜索工具 - 根据关键词获取微信指数数据及趋势
  3. 用于 Agent 执行时获取关键词的微信指数和趋势分析。
  4. """
  5. import asyncio
  6. import json
  7. import os
  8. from datetime import datetime, timedelta
  9. from typing import Any, Dict, List, Optional, Tuple
  10. import httpx
  11. from agent.tools import tool, ToolResult
  12. # 微信指数 API 配置
  13. WEIXIN_INDEX_BASE_URL = os.getenv("WEIXIN_INDEX_BASE_URL", "http://crawapi.piaoquantv.com")
  14. DEFAULT_TIMEOUT = 30.0
  15. def _get_date_range() -> Tuple[str, str]:
  16. """
  17. 获取日期范围:昨天和往前20天的日期。
  18. Returns:
  19. tuple: (start_ymd, end_ymd) 格式为 "YYYYMMDD"
  20. """
  21. yesterday = datetime.now() - timedelta(days=1)
  22. start_date = yesterday - timedelta(days=20)
  23. end_ymd = yesterday.strftime("%Y%m%d")
  24. start_ymd = start_date.strftime("%Y%m%d")
  25. return start_ymd, end_ymd
  26. def _calculate_trend(data: List[Dict[str, Any]]) -> Dict[str, Any]:
  27. """
  28. 根据 total_score 计算趋势。
  29. Args:
  30. data: API 返回的数据列表,包含每天的 channel_score
  31. Returns:
  32. Dict: 包含趋势信息和最新热度
  33. """
  34. if not data or len(data) < 2:
  35. return {
  36. "trend": "未知",
  37. "total_score": 0,
  38. "message": "数据不足,无法计算趋势"
  39. }
  40. # 按日期排序(确保顺序正确)
  41. sorted_data = sorted(data, key=lambda x: x.get("ymd", ""))
  42. # 获取最近两天的数据
  43. recent_scores = []
  44. for item in sorted_data[-2:]:
  45. channel_score = item.get("channel_score", {})
  46. total_score = channel_score.get("total_score", 0)
  47. recent_scores.append(total_score)
  48. if len(recent_scores) < 2:
  49. # 如果只有一天的数据,使用最后一天的数据
  50. latest_item = sorted_data[-1]
  51. channel_score = latest_item.get("channel_score", {})
  52. total_score = channel_score.get("total_score", 0)
  53. return {
  54. "trend": "未知",
  55. "total_score": total_score,
  56. "message": "只有一天数据,无法计算趋势"
  57. }
  58. # 计算趋势
  59. yesterday_score = recent_scores[-1] # 最新一天(昨天)
  60. day_before_score = recent_scores[-2] # 前一天
  61. if yesterday_score > day_before_score:
  62. trend = "上升"
  63. elif yesterday_score < day_before_score:
  64. trend = "下降"
  65. else:
  66. trend = "持平"
  67. # 计算变化百分比
  68. if day_before_score > 0:
  69. change_percent = ((yesterday_score - day_before_score) / day_before_score) * 100
  70. else:
  71. change_percent = 0.0
  72. return {
  73. "trend": trend,
  74. "total_score": yesterday_score,
  75. "previous_score": day_before_score,
  76. "change_percent": round(change_percent, 2),
  77. "latest_date": sorted_data[-1].get("ymd", "")
  78. }
  79. async def _call_weixin_index_api(
  80. keyword: str,
  81. start_ymd: str,
  82. end_ymd: str,
  83. ) -> Optional[List[Dict[str, Any]]]:
  84. """调用微信指数 API,返回结果列表。"""
  85. url = f"{WEIXIN_INDEX_BASE_URL.rstrip('/')}/crawler/wei_xin/wxindex"
  86. payload = {
  87. "keyword": keyword,
  88. "start_ymd": start_ymd,
  89. "end_ymd": end_ymd,
  90. }
  91. try:
  92. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  93. resp = await client.post(
  94. url,
  95. json=payload,
  96. headers={"Content-Type": "application/json"},
  97. )
  98. resp.raise_for_status()
  99. data = resp.json()
  100. except httpx.HTTPStatusError as e:
  101. raise RuntimeError(
  102. f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}"
  103. )
  104. except Exception as e:
  105. raise RuntimeError(f"请求异常: {str(e)}")
  106. # 解析响应格式: {'code': 0, 'msg': null, 'data': {'data': [...]}}
  107. if isinstance(data, dict):
  108. # 检查 code 字段
  109. code = data.get("code", 0)
  110. if code != 0:
  111. msg = data.get("msg", "未知错误")
  112. raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}")
  113. # 获取 data.data 数组
  114. outer_data = data.get("data", {})
  115. if isinstance(outer_data, dict):
  116. inner_data = outer_data.get("data", [])
  117. else:
  118. inner_data = outer_data if isinstance(outer_data, list) else []
  119. if isinstance(inner_data, list):
  120. return inner_data
  121. return []
  122. return []
  123. @tool(
  124. description="根据关键词获取微信指数数据及趋势分析。自动获取最近21天的数据(从20天前到昨天),并计算趋势(上升/下降/持平)和最新热度。",
  125. display={
  126. "zh": {
  127. "name": "微信指数搜索",
  128. "params": {
  129. "keyword": "搜索关键词,例如:'养生'",
  130. },
  131. },
  132. },
  133. )
  134. async def weixin_index_search(
  135. keyword: str,
  136. ) -> ToolResult:
  137. """
  138. 根据关键词获取微信指数数据及趋势分析。
  139. Args:
  140. keyword: 搜索关键词,例如:"养生"
  141. Returns:
  142. ToolResult: 包含趋势信息和热度数据
  143. """
  144. # 验证关键词
  145. if not keyword or not keyword.strip():
  146. return ToolResult(
  147. title="微信指数搜索失败",
  148. output="",
  149. error="关键词不能为空",
  150. )
  151. keyword = keyword.strip()
  152. # 获取日期范围
  153. try:
  154. start_ymd, end_ymd = _get_date_range()
  155. except Exception as e:
  156. return ToolResult(
  157. title="微信指数搜索失败",
  158. output="",
  159. error=f"日期计算失败: {str(e)}",
  160. )
  161. # 调用 API
  162. try:
  163. results = await _call_weixin_index_api(
  164. keyword=keyword,
  165. start_ymd=start_ymd,
  166. end_ymd=end_ymd,
  167. )
  168. except RuntimeError as e:
  169. return ToolResult(
  170. title="微信指数搜索失败",
  171. output="",
  172. error=str(e),
  173. )
  174. if not results:
  175. return ToolResult(
  176. title="微信指数搜索",
  177. output=json.dumps(
  178. {
  179. "message": "未找到微信指数数据",
  180. "keyword": keyword,
  181. "date_range": {
  182. "start_ymd": start_ymd,
  183. "end_ymd": end_ymd,
  184. },
  185. },
  186. ensure_ascii=False,
  187. indent=2,
  188. ),
  189. )
  190. # 计算趋势
  191. trend_info = _calculate_trend(results)
  192. # 构建返回结果
  193. result_data = {
  194. "date_range": {
  195. "start_ymd": start_ymd,
  196. "end_ymd": end_ymd,
  197. },
  198. "trend": trend_info.get("trend"),
  199. "change_percent": trend_info.get("change_percent"),
  200. "total_score": trend_info.get("total_score"),
  201. }
  202. output = json.dumps(result_data, ensure_ascii=False, indent=2)
  203. return ToolResult(
  204. title=f"微信指数搜索 - {keyword}",
  205. output=output,
  206. )
  207. if __name__ == '__main__':
  208. res = asyncio.run(weixin_index_search("马年"))
  209. print(res)