| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- """
- 微信指数搜索工具 - 根据关键词获取微信指数数据及趋势
- 用于 Agent 执行时获取关键词的微信指数和趋势分析。
- """
- import asyncio
- import json
- import os
- from datetime import datetime, timedelta
- from typing import Any, Dict, List, Optional, Tuple
- import httpx
- from agent.tools import tool, ToolResult
- # 微信指数 API 配置
- WEIXIN_INDEX_BASE_URL = os.getenv("WEIXIN_INDEX_BASE_URL", "http://crawapi.piaoquantv.com")
- DEFAULT_TIMEOUT = 30.0
- def _get_date_range() -> Tuple[str, str]:
- """
- 获取日期范围:昨天和往前20天的日期。
-
- Returns:
- tuple: (start_ymd, end_ymd) 格式为 "YYYYMMDD"
- """
- yesterday = datetime.now() - timedelta(days=1)
- start_date = yesterday - timedelta(days=20)
-
- end_ymd = yesterday.strftime("%Y%m%d")
- start_ymd = start_date.strftime("%Y%m%d")
-
- return start_ymd, end_ymd
- def _calculate_trend(data: List[Dict[str, Any]]) -> Dict[str, Any]:
- """
- 根据 total_score 计算趋势。
-
- Args:
- data: API 返回的数据列表,包含每天的 channel_score
-
- Returns:
- Dict: 包含趋势信息和最新热度
- """
- if not data or len(data) < 2:
- return {
- "trend": "未知",
- "total_score": 0,
- "message": "数据不足,无法计算趋势"
- }
-
- # 按日期排序(确保顺序正确)
- sorted_data = sorted(data, key=lambda x: x.get("ymd", ""))
-
- # 获取最近两天的数据
- recent_scores = []
- for item in sorted_data[-2:]:
- channel_score = item.get("channel_score", {})
- total_score = channel_score.get("total_score", 0)
- recent_scores.append(total_score)
-
- if len(recent_scores) < 2:
- # 如果只有一天的数据,使用最后一天的数据
- latest_item = sorted_data[-1]
- channel_score = latest_item.get("channel_score", {})
- total_score = channel_score.get("total_score", 0)
- return {
- "trend": "未知",
- "total_score": total_score,
- "message": "只有一天数据,无法计算趋势"
- }
-
- # 计算趋势
- yesterday_score = recent_scores[-1] # 最新一天(昨天)
- day_before_score = recent_scores[-2] # 前一天
-
- if yesterday_score > day_before_score:
- trend = "上升"
- elif yesterday_score < day_before_score:
- trend = "下降"
- else:
- trend = "持平"
-
- # 计算变化百分比
- if day_before_score > 0:
- change_percent = ((yesterday_score - day_before_score) / day_before_score) * 100
- else:
- change_percent = 0.0
-
- return {
- "trend": trend,
- "total_score": yesterday_score,
- "previous_score": day_before_score,
- "change_percent": round(change_percent, 2),
- "latest_date": sorted_data[-1].get("ymd", "")
- }
- async def _call_weixin_index_api(
- keyword: str,
- start_ymd: str,
- end_ymd: str,
- ) -> Optional[List[Dict[str, Any]]]:
- """调用微信指数 API,返回结果列表。"""
- url = f"{WEIXIN_INDEX_BASE_URL.rstrip('/')}/crawler/wei_xin/wxindex"
- payload = {
- "keyword": keyword,
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- }
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(
- url,
- json=payload,
- headers={"Content-Type": "application/json"},
- )
- resp.raise_for_status()
- data = resp.json()
- except httpx.HTTPStatusError as e:
- raise RuntimeError(
- f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}"
- )
- except Exception as e:
- raise RuntimeError(f"请求异常: {str(e)}")
- # 解析响应格式: {'code': 0, 'msg': null, 'data': {'data': [...]}}
- if isinstance(data, dict):
- # 检查 code 字段
- code = data.get("code", 0)
- if code != 0:
- msg = data.get("msg", "未知错误")
- raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}")
- # 获取 data.data 数组
- outer_data = data.get("data", {})
- if isinstance(outer_data, dict):
- inner_data = outer_data.get("data", [])
- else:
- inner_data = outer_data if isinstance(outer_data, list) else []
- if isinstance(inner_data, list):
- return inner_data
- return []
- return []
- @tool(
- description="根据关键词获取微信指数数据及趋势分析。自动获取最近21天的数据(从20天前到昨天),并计算趋势(上升/下降/持平)和最新热度。",
- display={
- "zh": {
- "name": "微信指数搜索",
- "params": {
- "keyword": "搜索关键词,例如:'养生'",
- },
- },
- },
- )
- async def weixin_index_search(
- keyword: str,
- ) -> ToolResult:
- """
- 根据关键词获取微信指数数据及趋势分析。
- Args:
- keyword: 搜索关键词,例如:"养生"
- Returns:
- ToolResult: 包含趋势信息和热度数据
- """
- # 验证关键词
- if not keyword or not keyword.strip():
- return ToolResult(
- title="微信指数搜索失败",
- output="",
- error="关键词不能为空",
- )
- keyword = keyword.strip()
- # 获取日期范围
- try:
- start_ymd, end_ymd = _get_date_range()
- except Exception as e:
- return ToolResult(
- title="微信指数搜索失败",
- output="",
- error=f"日期计算失败: {str(e)}",
- )
- # 调用 API
- try:
- results = await _call_weixin_index_api(
- keyword=keyword,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- )
- except RuntimeError as e:
- return ToolResult(
- title="微信指数搜索失败",
- output="",
- error=str(e),
- )
- if not results:
- return ToolResult(
- title="微信指数搜索",
- output=json.dumps(
- {
- "message": "未找到微信指数数据",
- "keyword": keyword,
- "date_range": {
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- },
- },
- ensure_ascii=False,
- indent=2,
- ),
- )
- # 计算趋势
- trend_info = _calculate_trend(results)
- # 构建返回结果
- result_data = {
- "date_range": {
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- },
- "trend": trend_info.get("trend"),
- "change_percent": trend_info.get("change_percent"),
- "total_score": trend_info.get("total_score"),
- }
- output = json.dumps(result_data, ensure_ascii=False, indent=2)
-
- return ToolResult(
- title=f"微信指数搜索 - {keyword}",
- output=output,
- )
- if __name__ == '__main__':
- res = asyncio.run(weixin_index_search("马年"))
- print(res)
|