""" 抖音账号历史作品工具(示例) 调用内部爬虫服务获取指定账号的历史作品列表。 """ import asyncio import logging import time from typing import Optional import requests from agent.tools import tool, ToolResult logger = logging.getLogger(__name__) # API 基础配置 DOUYIN_BLOGGER_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/blogger" DEFAULT_TIMEOUT = 60.0 @tool(description="根据账号ID获取抖音历史作品,支持排序与游标") async def douyin_user_videos( account_id: str, sort_type: str = "最新", cursor: str = "", timeout: Optional[float] = None, ) -> ToolResult: """ 抖音账号历史作品查询 获取指定抖音账号的历史作品列表,支持排序和分页。 Args: account_id: 抖音账号ID(使用 author.sec_uid) sort_type: 排序方式(可选:最新/最热,默认 "最新") cursor: 分页游标,用于获取下一页结果,默认 "" timeout: 超时时间(秒),默认 60 Returns: ToolResult: 包含以下内容: - output: 文本格式的作品列表摘要(显示前5条) - metadata.user_videos: 结构化的作品列表(与 search_results 格式一致) - aweme_id: 视频ID - desc: 视频描述(最多100字符) - author: 作者信息 - nickname: 作者昵称 - sec_uid: 作者ID(完整,约80字符) - statistics: 统计数据 - digg_count: 点赞数 - comment_count: 评论数 - share_count: 分享数 - metadata.raw_data: 原始 API 返回数据 Note: - account_id 参数使用 author.sec_uid(约80字符) - 使用 cursor 参数可以获取下一页结果 - 建议从 metadata.user_videos 获取结构化数据 - user_videos 与 search_results 格式完全一致,可使用相同的处理逻辑 """ start_time = time.time() try: payload = { "account_id": account_id, "sort_type": sort_type, "cursor": cursor, } request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT response = requests.post( DOUYIN_BLOGGER_API, json=payload, headers={"Content-Type": "application/json"}, timeout=request_timeout ) response.raise_for_status() data = response.json() # 格式化输出摘要 summary_lines = [f"账号 {account_id} 的作品列表"] data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {} items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else [] has_more = data_block.get("has_more", False) cursor_value = data_block.get("next_cursor", "") summary_lines.append(f"找到 {len(items)} 个作品" + (f",还有更多(cursor={cursor_value})" if has_more else "")) summary_lines.append("") # 显示前5条 for i, item in enumerate(items[:5], 1): aweme_id = item.get("aweme_id", "unknown") desc = (item.get("desc") or item.get("item_title") or "无标题")[:50] author = item.get("author", {}) author_name = author.get("nickname", "未知作者") author_id = author.get("sec_uid", "") stats = item.get("statistics", {}) digg_count = stats.get("digg_count", 0) comment_count = stats.get("comment_count", 0) share_count = stats.get("share_count", 0) summary_lines.append(f"{i}. {desc}") summary_lines.append(f" ID: {aweme_id}") summary_lines.append(f" 链接: https://www.douyin.com/video/{aweme_id}") summary_lines.append(f" 作者: {author_name}") summary_lines.append(f" sec_uid: {author_id}") summary_lines.append(f" 数据: 点赞 {digg_count:,} | 评论 {comment_count:,} | 分享 {share_count:,}") summary_lines.append("") if len(items) > 5: summary_lines.append(f"... 还有 {len(items) - 5} 条结果") duration_ms = int((time.time() - start_time) * 1000) logger.info( "douyin_user_videos completed", extra={ "account_id": account_id, "results_count": len(items), "has_more": has_more, "cursor": cursor_value, "duration_ms": duration_ms } ) return ToolResult( title=f"账号作品: {account_id}", output="\n".join(summary_lines), long_term_memory=f"Fetched {len(items)} videos for account '{account_id}'", metadata={ "raw_data": data, "user_videos": [ # 结构化数据,与 search_results 保持一致 { "aweme_id": item.get("aweme_id"), "desc": (item.get("desc") or item.get("item_title") or "无标题")[:100], "author": { "nickname": item.get("author", {}).get("nickname", "未知作者"), "sec_uid": item.get("author", {}).get("sec_uid", ""), }, "statistics": { "digg_count": item.get("statistics", {}).get("digg_count", 0), "comment_count": item.get("statistics", {}).get("comment_count", 0), "share_count": item.get("statistics", {}).get("share_count", 0), } } for item in items ] } ) except requests.exceptions.HTTPError as e: logger.error( "douyin_user_videos HTTP error", extra={ "account_id": account_id, "status_code": e.response.status_code, "error": str(e) } ) return ToolResult( title="账号作品获取失败", output="", error=f"HTTP {e.response.status_code}: {e.response.text}", ) except requests.exceptions.Timeout: logger.error("douyin_user_videos timeout", extra={"account_id": account_id, "timeout": request_timeout}) return ToolResult( title="账号作品获取失败", output="", error=f"请求超时({request_timeout}秒)", ) except requests.exceptions.RequestException as e: logger.error("douyin_user_videos network error", extra={"account_id": account_id, "error": str(e)}) return ToolResult( title="账号作品获取失败", output="", error=f"网络错误: {str(e)}", ) except Exception as e: logger.error("douyin_user_videos unexpected error", extra={"account_id": account_id, "error": str(e)}, exc_info=True) return ToolResult( title="账号作品获取失败", output="", error=f"未知错误: {str(e)}", ) async def main(): result = await douyin_user_videos( account_id="MS4wLjABAAAAPRCMGPAFM1VGcJrxRuvTXgJp0Sk95EW1DynNmbKSPg8", sort_type="最新", cursor="" ) print(result.output) if __name__ == "__main__": asyncio.run(main())