| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- """
- 票圈视频库工具
- 提供视频搜索、详情获取、账号分析等功能。
- """
- import json
- from typing import Optional, List, Dict, Any
- import httpx
- from agent.tools import tool, ToolResult
- # API 基础配置
- VIDEO_LIBRARY_API = "http://your-video-library-api.com" # 替换为实际API地址
- DEFAULT_TIMEOUT = 60.0
- @tool(
- display={
- "zh": {
- "name": "搜索票圈视频库",
- "params": {
- "keyword": "搜索关键词",
- "max_count": "返回条数",
- "filters": "过滤条件"
- }
- }
- }
- )
- async def video_library_search(
- keyword: str,
- max_count: int = 10,
- filters: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- """
- 在票圈视频库中搜索视频
- Args:
- keyword: 搜索关键词
- max_count: 返回的最大条数,默认为 10
- filters: 过滤条件,可选字段:
- - platform: 平台过滤(douyin/xhs/bili等)
- - min_likes: 最小点赞数
- - date_range: 发布时间范围
- - tags: 标签过滤
- Returns:
- ToolResult 包含搜索结果:
- {
- "code": 0,
- "message": "success",
- "data": [
- {
- "video_id": "v123456",
- "title": "14岁抗日娃娃军",
- "platform": "douyin",
- "account_id": "acc789",
- "account_name": "历史故事",
- "like_count": 12700,
- "publish_time": "2025-01-15",
- "tags": ["抗日", "娃娃军", "感动"],
- "shipping_info": {
- "status": "approved",
- "category": "历史教育"
- }
- }
- ]
- }
- """
- try:
- url = f"{VIDEO_LIBRARY_API}/api/videos/search"
- payload = {
- "keyword": keyword,
- "max_count": max_count,
- }
- if filters:
- payload["filters"] = filters
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.post(
- url,
- json=payload,
- headers={"Content-Type": "application/json"},
- )
- response.raise_for_status()
- data = response.json()
- result_count = len(data.get("data", []))
- return ToolResult(
- title=f"视频库搜索: {keyword}",
- output=json.dumps(data, ensure_ascii=False, indent=2),
- long_term_memory=f"Searched video library for '{keyword}', found {result_count} videos"
- )
- except httpx.HTTPStatusError as e:
- return ToolResult(
- title="搜索失败",
- output="",
- error=f"HTTP error {e.response.status_code}: {e.response.text}"
- )
- except Exception as e:
- return ToolResult(
- title="搜索失败",
- output="",
- error=str(e)
- )
- @tool(
- display={
- "zh": {
- "name": "获取视频详情",
- "params": {
- "video_id": "视频ID"
- }
- }
- }
- )
- async def video_library_get_detail(
- video_id: str,
- ) -> ToolResult:
- """
- 获取视频的详细信息
- Args:
- video_id: 视频唯一标识符
- Returns:
- ToolResult 包含视频详情:
- {
- "code": 0,
- "message": "success",
- "data": {
- "video_id": "v123456",
- "title": "14岁抗日娃娃军",
- "description": "...",
- "platform": "douyin",
- "platform_video_id": "7123456789",
- "account_id": "acc789",
- "account_name": "历史故事",
- "like_count": 12700,
- "comment_count": 856,
- "share_count": 432,
- "publish_time": "2025-01-15",
- "duration": 180,
- "tags": ["抗日", "娃娃军", "感动"],
- "shipping_info": {
- "status": "approved",
- "category": "历史教育",
- "target_audience": "50+",
- "emotional_tone": "感动",
- "quality_score": 8.5
- },
- "url": "https://..."
- }
- }
- """
- try:
- url = f"{VIDEO_LIBRARY_API}/api/videos/{video_id}"
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.get(url)
- response.raise_for_status()
- data = response.json()
- video_data = data.get("data", {})
- title = video_data.get("title", "Unknown")
- return ToolResult(
- title=f"视频详情: {title}",
- output=json.dumps(data, ensure_ascii=False, indent=2),
- long_term_memory=f"Retrieved video detail for {video_id}"
- )
- except httpx.HTTPStatusError as e:
- return ToolResult(
- title="获取详情失败",
- output="",
- error=f"HTTP error {e.response.status_code}: {e.response.text}"
- )
- except Exception as e:
- return ToolResult(
- title="获取详情失败",
- output="",
- error=str(e)
- )
- @tool(
- display={
- "zh": {
- "name": "获取账号信息",
- "params": {
- "account_id": "账号ID",
- "include_videos": "是否包含视频列表",
- "include_recommendations": "是否包含推荐账号"
- }
- }
- }
- )
- async def video_library_get_account(
- account_id: str,
- include_videos: bool = True,
- include_recommendations: bool = True,
- ) -> ToolResult:
- """
- 获取账号信息及相关推荐
- Args:
- account_id: 账号唯一标识符
- include_videos: 是否包含该账号的视频列表,默认 True
- include_recommendations: 是否包含相关推荐账号,默认 True
- Returns:
- ToolResult 包含账号信息:
- {
- "code": 0,
- "message": "success",
- "data": {
- "account_id": "acc789",
- "account_name": "历史故事",
- "platform": "douyin",
- "follower_count": 125000,
- "video_count": 342,
- "tags": ["历史", "教育", "正能量"],
- "videos": [
- {
- "video_id": "v123456",
- "title": "...",
- "like_count": 12700
- }
- ],
- "recommended_accounts": [
- {
- "account_id": "acc790",
- "account_name": "红色记忆",
- "similarity_score": 0.85,
- "reason": "内容相似"
- }
- ]
- }
- }
- """
- try:
- url = f"{VIDEO_LIBRARY_API}/api/accounts/{account_id}"
- params = {
- "include_videos": include_videos,
- "include_recommendations": include_recommendations,
- }
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.get(url, params=params)
- response.raise_for_status()
- data = response.json()
- account_data = data.get("data", {})
- account_name = account_data.get("account_name", "Unknown")
- return ToolResult(
- title=f"账号信息: {account_name}",
- output=json.dumps(data, ensure_ascii=False, indent=2),
- long_term_memory=f"Retrieved account info for {account_id}"
- )
- except httpx.HTTPStatusError as e:
- return ToolResult(
- title="获取账号信息失败",
- output="",
- error=f"HTTP error {e.response.status_code}: {e.response.text}"
- )
- except Exception as e:
- return ToolResult(
- title="获取账号信息失败",
- output="",
- error=str(e)
- )
|