""" 爬虫服务工具模块 提供 YouTube、X (Twitter) 和微信/通用链接的搜索和详情查询功能。 """ import asyncio import base64 import io import json import math import os import subprocess import tempfile from pathlib import Path from typing import Optional, List, Dict, Any import httpx from PIL import Image, ImageDraw, ImageFont from agent.tools import tool, ToolResult # API 配置 CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler" AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel" DEFAULT_TIMEOUT = 60.0 # 拼接图配置 THUMB_WIDTH = 250 THUMB_HEIGHT = 250 TEXT_HEIGHT = 80 GRID_COLS = 5 PADDING = 12 BG_COLOR = (255, 255, 255) TEXT_COLOR = (30, 30, 30) INDEX_COLOR = (220, 60, 60) # 视频处理相关配置 VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos" VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True) # ── 辅助函数 ── def _truncate_text(text: str, max_len: int = 14) -> str: """截断文本,超出部分用省略号""" return text[:max_len] + "..." if len(text) > max_len else text async def _download_image(client: httpx.AsyncClient, url: str) -> Optional[Image.Image]: """下载单张图片,失败返回 None""" try: resp = await client.get(url, timeout=15.0) resp.raise_for_status() return Image.open(io.BytesIO(resp.content)).convert("RGB") except Exception: return None async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]: """ 将视频缩略图+序号+标题拼接成网格图,返回 base64 编码的 PNG。 """ if not videos: return None items = [] for idx, video in enumerate(videos): thumbnail = None if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]: thumbnail = video["thumbnails"][0].get("url") elif "thumbnail" in video: thumbnail = video.get("thumbnail") elif "cover_url" in video: thumbnail = video.get("cover_url") title = video.get("title", "") or video.get("text", "") if thumbnail: items.append({"url": thumbnail, "title": title, "index": idx + 1}) if not items: return None async with httpx.AsyncClient() as client: tasks = [_download_image(client, item["url"]) for item in items] downloaded = await asyncio.gather(*tasks) valid = [(item, img) for item, img in zip(items, downloaded) if img is not None] if not valid: return None cols = min(GRID_COLS, len(valid)) rows = math.ceil(len(valid) / cols) cell_w = THUMB_WIDTH + PADDING cell_h = THUMB_HEIGHT + TEXT_HEIGHT + PADDING canvas_w = cols * cell_w + PADDING canvas_h = rows * cell_h + PADDING canvas = Image.new("RGB", (canvas_w, canvas_h), BG_COLOR) draw = ImageDraw.Draw(canvas) font_title = None font_index = None font_candidates = [ "msyh.ttc", "simhei.ttf", "simsun.ttc", "/System/Library/Fonts/PingFang.ttc", "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf", "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", ] for font_path in font_candidates: try: font_title = ImageFont.truetype(font_path, 16) font_index = ImageFont.truetype(font_path, 32) break except Exception: continue if not font_title: font_title = ImageFont.load_default() font_index = font_title for item, img in valid: idx = item["index"] col = (idx - 1) % cols row = (idx - 1) // cols x = PADDING + col * cell_w y = PADDING + row * cell_h scale = min(THUMB_WIDTH / img.width, THUMB_HEIGHT / img.height) new_w = int(img.width * scale) new_h = int(img.height * scale) thumb = img.resize((new_w, new_h), Image.LANCZOS) offset_x = x + (THUMB_WIDTH - new_w) // 2 offset_y = y + (THUMB_HEIGHT - new_h) // 2 canvas.paste(thumb, (offset_x, offset_y)) index_text = str(idx) idx_x = offset_x idx_y = offset_y + 4 box_size = 52 draw.rectangle([idx_x, idx_y, idx_x + box_size, idx_y + box_size], fill=INDEX_COLOR) bbox = draw.textbbox((0, 0), index_text, font=font_index) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] text_x = idx_x + (box_size - tw) // 2 text_y = idx_y + (box_size - th) // 2 draw.text((text_x, text_y), index_text, fill=(255, 255, 255), font=font_index) title = item["title"] or "" if title: words = list(title) lines = [] current_line = "" for ch in words: test_line = current_line + ch bbox_line = draw.textbbox((0, 0), test_line, font=font_title) if bbox_line[2] - bbox_line[0] > THUMB_WIDTH: if current_line: lines.append(current_line) current_line = ch else: current_line = test_line if current_line: lines.append(current_line) for line_i, line in enumerate(lines): draw.text((x, y + THUMB_HEIGHT + 6 + line_i * 22), line, fill=TEXT_COLOR, font=font_title) buf = io.BytesIO() canvas.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode("utf-8") def _parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]: """解析 SRT 字幕,生成带时间戳的大纲""" if not srt_content: return [] outline = [] blocks = srt_content.strip().split('\n\n') for block in blocks: lines = block.strip().split('\n') if len(lines) >= 3: timestamp_line = lines[1] if '-->' in timestamp_line: start_time = timestamp_line.split('-->')[0].strip() text = ' '.join(lines[2:]) outline.append({'timestamp': start_time, 'text': text}) return outline def _download_youtube_video(video_id: str) -> Optional[str]: """使用 yt-dlp 下载 YouTube 视频,返回文件路径""" try: output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4" if output_path.exists(): return str(output_path) cmd = [ 'yt-dlp', '-f', 'best[ext=mp4]', '-o', str(output_path), f'https://www.youtube.com/watch?v={video_id}' ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode == 0 and output_path.exists(): return str(output_path) return None except Exception: return None # ── YouTube 工具 ── @tool() async def youtube_search(keyword: str) -> ToolResult: """ 搜索 YouTube 视频 Args: keyword: 搜索关键词 Returns: 搜索结果列表,包含视频标题、ID、频道等信息 """ try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: response = await client.post( f"{CRAWLER_BASE_URL}/youtube/keyword", json={"keyword": keyword} ) response.raise_for_status() data = response.json() if data.get("code") == 0: result_data = data.get("data", {}) videos = result_data.get("data", []) if isinstance(result_data, dict) else [] images = [] collage_b64 = await _build_video_collage(videos) if collage_b64: images.append({ "type": "base64", "media_type": "image/png", "data": collage_b64 }) summary_list = [] for idx, video in enumerate(videos[:20], 1): title = video.get("title", "") author = video.get("author", "") video_id = video.get("video_id", "") summary_list.append(f"{idx}. {title} - {author} (ID: {video_id})") output_data = { "keyword": keyword, "total": len(videos), "summary": summary_list, "data": videos } return ToolResult( title=f"YouTube 搜索: {keyword}", output=json.dumps(output_data, ensure_ascii=False, indent=2), long_term_memory=f"Searched YouTube for '{keyword}', found {len(videos)} videos", images=images ) else: return ToolResult( title="YouTube 搜索失败", output="", error=f"搜索失败: {data.get('msg', '未知错误')}" ) except Exception as e: return ToolResult( title="YouTube 搜索异常", output="", error=str(e) ) @tool() async def youtube_detail( content_id: str, include_captions: bool = True, download_video: bool = False ) -> ToolResult: """ 获取 YouTube 视频详情(可选包含字幕、下载视频并生成大纲) Args: content_id: 视频 ID include_captions: 是否包含字幕,默认 True download_video: 是否下载视频并生成带时间戳的大纲,默认 False。 下载后可使用 extract_video_clip 截取视频片段观看。 Returns: 视频详细信息,包含字幕、视频大纲和本地文件路径 """ try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: detail_response = await client.post( f"{CRAWLER_BASE_URL}/youtube/detail", json={"content_id": content_id} ) detail_response.raise_for_status() detail_data = detail_response.json() if detail_data.get("code") != 0: return ToolResult( title="获取详情失败", output="", error=f"获取详情失败: {detail_data.get('msg', '未知错误')}" ) result_data = detail_data.get("data", {}) video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {} # 获取字幕 captions_text = None if include_captions or download_video: try: captions_response = await client.post( f"{CRAWLER_BASE_URL}/youtube/captions", json={"content_id": content_id} ) captions_response.raise_for_status() captions_data = captions_response.json() if captions_data.get("code") == 0: captions_result = captions_data.get("data", {}) if isinstance(captions_result, dict): inner_data = captions_result.get("data", {}) if isinstance(inner_data, dict): captions_text = inner_data.get("content") except Exception: pass # 下载视频并生成大纲 video_path = None video_outline = None if download_video: video_path = await asyncio.to_thread(_download_youtube_video, content_id) if captions_text: video_outline = _parse_srt_to_outline(captions_text) # 合并数据 output_data = { "video_id": content_id, "title": video_info.get("title", ""), "channel": video_info.get("channel_account_name", ""), "description": video_info.get("body_text", ""), "like_count": video_info.get("like_count"), "comment_count": video_info.get("comment_count"), "publish_timestamp": video_info.get("publish_timestamp"), "content_link": video_info.get("content_link", ""), "captions": captions_text, "full_data": video_info } if download_video: output_data["video_path"] = video_path output_data["video_outline"] = video_outline if not video_path: output_data["download_error"] = "视频下载失败,请检查 yt-dlp 是否可用" memory = f"Retrieved YouTube video details for {content_id}" if captions_text: memory += " with captions" if video_path: memory += f", downloaded to {video_path}" return ToolResult( title=f"YouTube 视频详情: {content_id}", output=json.dumps(output_data, ensure_ascii=False, indent=2), long_term_memory=memory ) except Exception as e: return ToolResult( title="YouTube 详情查询异常", output="", error=str(e) ) # ── X (Twitter) 工具 ── @tool() async def x_search(keyword: str) -> ToolResult: """ 搜索 X (Twitter) 内容(数据已结构化,无需访问详情页) Args: keyword: 搜索关键词 Returns: 搜索结果列表,包含推文内容、作者、互动数据等 """ try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: response = await client.post( "http://crawler.aiddit.com/crawler/x/keyword", json={"keyword": keyword} ) response.raise_for_status() data = response.json() if data.get("code") == 0: result_data = data.get("data", {}) tweets = result_data.get("data", []) if isinstance(result_data, dict) else [] # 构建拼接图 images = [] tweets_with_images = [] for tweet in tweets: image_list = tweet.get("image_url_list", []) if image_list: tweet["thumbnails"] = [{"url": image_list[0].get("image_url")}] tweets_with_images.append(tweet) collage_b64 = await _build_video_collage(tweets_with_images if tweets_with_images else tweets) if collage_b64: images.append({ "type": "base64", "media_type": "image/png", "data": collage_b64 }) summary_list = [] for idx, tweet in enumerate(tweets[:20], 1): text = tweet.get("body_text", "")[:100] author = tweet.get("channel_account_name", "") summary_list.append(f"{idx}. @{author}: {text}") output_data = { "keyword": keyword, "total": len(tweets), "summary": summary_list, "data": tweets } return ToolResult( title=f"X 搜索: {keyword}", output=json.dumps(output_data, ensure_ascii=False, indent=2), long_term_memory=f"Searched X (Twitter) for '{keyword}', found {len(tweets)} tweets", images=images ) else: return ToolResult( title="X 搜索失败", output="", error=f"搜索失败: {data.get('msg', '未知错误')}" ) except Exception as e: return ToolResult( title="X 搜索异常", output="", error=str(e) ) # ── 内容导入工具 ── @tool() async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult: """ 导入长文内容(微信公众号、小红书、抖音等通用链接) Args: plan_name: 计划名称 content_data: 内容数据列表,每项包含 channel、content_link、title 等字段 Returns: 导入结果,包含 plan_id """ try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: response = await client.post( f"{AIGC_BASE_URL}/weixin/auto_insert", json={"plan_name": plan_name, "data": content_data} ) response.raise_for_status() data = response.json() if data.get("code") == 0: result_data = data.get("data", {}) return ToolResult( title=f"内容导入: {plan_name}", output=json.dumps(result_data, ensure_ascii=False, indent=2), long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'" ) else: return ToolResult( title="导入失败", output="", error=f"导入失败: {data.get('msg', '未知错误')}" ) except Exception as e: return ToolResult( title="内容导入异常", output="", error=str(e) ) # ── 视频截取工具 ── @tool() async def extract_video_clip( video_id: str, start_time: str, end_time: str, output_name: Optional[str] = None ) -> ToolResult: """ 从已下载的 YouTube 视频中截取指定时间段的片段 Args: video_id: YouTube 视频 ID(必须先通过 youtube_detail(download_video=True) 下载) start_time: 开始时间,格式: HH:MM:SS 或 MM:SS end_time: 结束时间,格式: HH:MM:SS 或 MM:SS output_name: 输出文件名(可选) Returns: 截取的视频片段路径 Example: extract_video_clip("dQw4w9WgXcQ", "00:00:10", "00:00:30") """ try: source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4" if not source_video.exists(): return ToolResult( title="视频截取失败", output="", error="源视频不存在,请先使用 youtube_detail(download_video=True) 下载视频" ) if not output_name: output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4" output_path = VIDEO_DOWNLOAD_DIR / output_name cmd = [ 'ffmpeg', '-i', str(source_video), '-ss', start_time, '-to', end_time, '-c', 'copy', '-y', str(output_path) ] result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=60 ) if result.returncode == 0 and output_path.exists(): file_size = output_path.stat().st_size / (1024 * 1024) output_data = { "video_id": video_id, "clip_path": str(output_path), "start_time": start_time, "end_time": end_time, "file_size_mb": round(file_size, 2) } return ToolResult( title=f"视频片段截取成功: {start_time} - {end_time}", output=json.dumps(output_data, ensure_ascii=False, indent=2), long_term_memory=f"Extracted video clip from {video_id}: {start_time} to {end_time}" ) else: return ToolResult( title="视频截取失败", output="", error=f"ffmpeg 执行失败: {result.stderr}" ) except subprocess.TimeoutExpired: return ToolResult( title="视频截取超时", output="", error="视频截取超时(60秒)" ) except Exception as e: return ToolResult( title="视频截取异常", output="", error=str(e) )