Просмотр исходного кода

过滤时间少于45秒的视频 & 并发2条需求处理

jihuaqiang 6 часов назад
Родитель
Сommit
67e0388036

+ 1 - 1
docker-compose.yml

@@ -23,7 +23,7 @@ services:
       - SKILLS_DIR=./skills
       - ENABLED_SKILLS=${ENABLED_SKILLS:-}
       - PORT=8080
-      - MAX_CONCURRENT_TASKS=${MAX_CONCURRENT_TASKS:-1}
+      - MAX_CONCURRENT_TASKS=${MAX_CONCURRENT_TASKS:-2}
       - SCHEDULE_QUERY_API=${SCHEDULE_QUERY_API:-}
       - SCHEDULE_QUERY_API_TIMEOUT=${SCHEDULE_QUERY_API_TIMEOUT:-10.0}
       - KNOWHUB_API=${KNOWHUB_API:-http://43.106.118.91:9999}

+ 3 - 3
examples/content_finder/server.py

@@ -6,7 +6,7 @@
 2. 定时调度:启动后先恢复 demand_find_task 中 status=执行中 的任务;之后每 2 分钟轮询一次,
    若当前无任务在执行,则从 demand_content 取当天(dt=YYYYMMDD)、未建任务记录且 score 最高的一条执行(不区分品类)
 3. 并发控制:限制最大并发任务数;定时侧若已有任务在执行则跳过本次轮询
-4. 单次寻找任务最长执行 15 分钟,超时记为失败并回写 demand_find_task
+4. 单次寻找任务最长执行 25 分钟,超时记为失败并回写 demand_find_task
 """
 
 import asyncio
@@ -65,12 +65,12 @@ SCHEDULER_TZ = ZoneInfo(SCHEDULER_TIMEZONE)
 scheduler = AsyncIOScheduler(timezone=SCHEDULER_TZ)
 
 # 并发控制
-MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "1"))
+MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "2"))
 task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
 
 # 定时:轮询间隔(分钟)、单次任务超时(秒,默认 15 分钟)
 SCHEDULE_INTERVAL_MINUTES = int(os.getenv("SCHEDULE_INTERVAL_MINUTES", "2"))
-TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "1200"))
+TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "1500"))
 
 # 统计信息
 stats = {

+ 45 - 17
examples/content_finder/tools/douyin_search_tikhub.py

@@ -38,16 +38,36 @@ def _get_aweme_info(item: object) -> dict:
 DOUYIN_SEARCH_API = "https://api.tikhub.io/api/v1/douyin/search/fetch_video_search_v2"
 DEFAULT_TIMEOUT = 60.0
 
+MIN_DURATION_MS = 45 * 1000
+
+
+def _safe_int(value: object) -> Optional[int]:
+    if isinstance(value, bool):
+        return None
+    if isinstance(value, int):
+        return value
+    if isinstance(value, float):
+        return int(value)
+    if isinstance(value, str):
+        s = value.strip()
+        if not s:
+            return None
+        try:
+            return int(float(s))
+        except ValueError:
+            return None
+    return None
+
 
 
 @tool(description="通过关键词搜索抖音视频内容接口")
 async def douyin_search_tikhub(
     keyword: str,
-    content_type: str = "0",
+    content_type: str = "1",
     sort_type: str = "0",
     publish_time: str = "0",
     cursor: int = 0,
-    filter_duration: str = "0",
+    filter_duration: str = "1-5",
     search_id: str = "",
     backtrace: str = "",
     timeout: Optional[float] = None,
@@ -156,17 +176,25 @@ async def douyin_search_tikhub(
         search_id_value = next_page.get("search_id", "")
         backtrace_value = business_config.get("backtrace", "")
 
+        aweme_infos: list[dict] = []
+        for item in items:
+            aweme_info = _get_aweme_info(item)
+            duration_ms = _safe_int(aweme_info.get("duration"))
+            if duration_ms is not None and duration_ms < MIN_DURATION_MS:
+                continue
+            aweme_infos.append(aweme_info)
+        filtered_out_count = len(items) - len(aweme_infos)
+
         summary_lines.append(
-            f"找到 {len(items)} 条结果"
+            f"找到 {len(aweme_infos)} 条结果"
+            + (f"(过滤掉 {filtered_out_count} 条 duration<{MIN_DURATION_MS}ms)" if filtered_out_count else "")
             + (f",还有更多(cursor={cursor_value},search_id={search_id_value},backtrace={backtrace_value})" if has_more else "")
         )
         summary_lines.append("")
 
-        for i, item in enumerate(items, 1):
-            aweme_info = _get_aweme_info(item)
+        for i, aweme_info in enumerate(aweme_infos, 1):
             aweme_id = aweme_info.get("aweme_id", "unknown")
             desc = (aweme_info.get("desc") or aweme_info.get("item_title") or "无标题")[:50]
-
             author = aweme_info.get("author") if isinstance(aweme_info.get("author"), dict) else {}
             author_name = author.get("nickname", "未知作者")
             author_id = author.get("sec_uid", "")
@@ -189,7 +217,7 @@ async def douyin_search_tikhub(
             "douyin_search completed",
             extra={
                 "keyword": keyword,
-                "results_count": len(items),
+                "results_count": len(aweme_infos),
                 "has_more": has_more,
                 "cursor": cursor_value,
                 "duration_ms": duration_ms
@@ -199,7 +227,7 @@ async def douyin_search_tikhub(
         out = ToolResult(
             title=f"抖音搜索: {keyword}",
             output="\n".join(summary_lines),
-            long_term_memory=f"Searched Douyin for '{keyword}', found {len(items)} results",
+            long_term_memory=f"Searched Douyin for '{keyword}', found {len(aweme_infos)} results",
             metadata={
                 "request_params": {
                     "keyword": keyword,
@@ -220,38 +248,38 @@ async def douyin_search_tikhub(
                 "raw_data": data,
                 "search_results": [  # 结构化搜索结果,供 Agent 直接引用
                     {
-                        "aweme_id": _get_aweme_info(item).get("aweme_id"),
+                        "aweme_id": aweme_info.get("aweme_id"),
                         "desc": (
-                            _get_aweme_info(item).get("desc")
-                            or _get_aweme_info(item).get("item_title")
+                            aweme_info.get("desc")
+                            or aweme_info.get("item_title")
                             or "无标题"
                         )[:100],
                         "author": {
                             "nickname": (
-                                (_get_aweme_info(item).get("author") if isinstance(_get_aweme_info(item).get("author"), dict) else {})
+                                (aweme_info.get("author") if isinstance(aweme_info.get("author"), dict) else {})
                                 .get("nickname", "未知作者")
                             ),
                             "sec_uid": (
-                                (_get_aweme_info(item).get("author") if isinstance(_get_aweme_info(item).get("author"), dict) else {})
+                                (aweme_info.get("author") if isinstance(aweme_info.get("author"), dict) else {})
                                 .get("sec_uid", "")
                             ),
                         },
                         "statistics": {
                             "digg_count": (
-                                (_get_aweme_info(item).get("statistics") if isinstance(_get_aweme_info(item).get("statistics"), dict) else {})
+                                (aweme_info.get("statistics") if isinstance(aweme_info.get("statistics"), dict) else {})
                                 .get("digg_count", 0)
                             ),
                             "comment_count": (
-                                (_get_aweme_info(item).get("statistics") if isinstance(_get_aweme_info(item).get("statistics"), dict) else {})
+                                (aweme_info.get("statistics") if isinstance(aweme_info.get("statistics"), dict) else {})
                                 .get("comment_count", 0)
                             ),
                             "share_count": (
-                                (_get_aweme_info(item).get("statistics") if isinstance(_get_aweme_info(item).get("statistics"), dict) else {})
+                                (aweme_info.get("statistics") if isinstance(aweme_info.get("statistics"), dict) else {})
                                 .get("share_count", 0)
                             ),
                         }
                     }
-                    for item in items
+                    for aweme_info in aweme_infos
                 ]
             }
         )

+ 40 - 7
examples/content_finder/tools/douyin_user_videos.py

@@ -22,6 +22,25 @@ _LOG_LABEL = "工具调用:douyin_user_videos -> 抖音账号历史作品列
 # API 基础配置
 DOUYIN_BLOGGER_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/blogger"
 DEFAULT_TIMEOUT = 60.0
+MIN_DURATION_MS = 45 * 1000
+
+
+def _safe_int(value: object) -> Optional[int]:
+    if isinstance(value, bool):
+        return None
+    if isinstance(value, int):
+        return value
+    if isinstance(value, float):
+        return int(value)
+    if isinstance(value, str):
+        s = value.strip()
+        if not s:
+            return None
+        try:
+            return int(float(s))
+        except ValueError:
+            return None
+    return None
 
 
 @tool(description="根据账号ID获取抖音历史作品,支持排序与游标")
@@ -96,11 +115,25 @@ async def douyin_user_videos(
         has_more = data_block.get("has_more", False)
         cursor_value = data_block.get("next_cursor", "")
 
-        summary_lines.append(f"找到 {len(items)} 个作品" + (f",还有更多(cursor={cursor_value})" if has_more else ""))
+        filtered_items: list[dict] = []
+        for item in items:
+            video = item.get("video") if isinstance(item, dict) else None
+            video_dict = video if isinstance(video, dict) else {}
+            duration_ms = _safe_int(video_dict.get("duration"))
+            if duration_ms is not None and duration_ms < MIN_DURATION_MS:
+                continue
+            filtered_items.append(item)
+        filtered_out_count = len(items) - len(filtered_items)
+
+        summary_lines.append(
+            f"找到 {len(filtered_items)} 个作品"
+            + (f"(过滤掉 {filtered_out_count} 条 duration<{MIN_DURATION_MS}ms)" if filtered_out_count else "")
+            + (f",还有更多(cursor={cursor_value})" if has_more else "")
+        )
         summary_lines.append("")
 
         # 显示前5条
-        for i, item in enumerate(items[:5], 1):
+        for i, item in enumerate(filtered_items[:5], 1):
             aweme_id = item.get("aweme_id", "unknown")
             desc = (item.get("desc") or item.get("item_title") or "无标题")[:50]
 
@@ -121,15 +154,15 @@ async def douyin_user_videos(
             summary_lines.append(f"   数据: 点赞 {digg_count:,} | 评论 {comment_count:,} | 分享 {share_count:,}")
             summary_lines.append("")
 
-        if len(items) > 5:
-            summary_lines.append(f"... 还有 {len(items) - 5} 条结果")
+        if len(filtered_items) > 5:
+            summary_lines.append(f"... 还有 {len(filtered_items) - 5} 条结果")
 
         duration_ms = int((time.time() - start_time) * 1000)
         logger.info(
             "douyin_user_videos completed",
             extra={
                 "account_id": account_id,
-                "results_count": len(items),
+                "results_count": len(filtered_items),
                 "has_more": has_more,
                 "cursor": cursor_value,
                 "duration_ms": duration_ms
@@ -139,7 +172,7 @@ async def douyin_user_videos(
         out = ToolResult(
             title=f"账号作品: {account_id}",
             output="\n".join(summary_lines),
-            long_term_memory=f"Fetched {len(items)} videos for account '{account_id}'",
+            long_term_memory=f"Fetched {len(filtered_items)} videos for account '{account_id}'",
             metadata={
                 "raw_data": data,
                 "user_videos": [  # 结构化数据,与 search_results 保持一致
@@ -156,7 +189,7 @@ async def douyin_user_videos(
                             "share_count": item.get("statistics", {}).get("share_count", 0),
                         }
                     }
-                    for item in items
+                    for item in filtered_items
                 ]
             }
         )