| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- """
- 爬虫服务工具模块
- 提供 YouTube、X (Twitter) 和微信/通用链接的搜索和详情查询功能。
- """
- import asyncio
- import base64
- import io
- import json
- import math
- import os
- import subprocess
- import tempfile
- from pathlib import Path
- from typing import Optional, List, Dict, Any
- import httpx
- from PIL import Image, ImageDraw, ImageFont
- from agent.tools import tool, ToolResult
- # API 配置
- CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
- AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
- DEFAULT_TIMEOUT = 60.0
- # 拼接图配置
- THUMB_WIDTH = 250
- THUMB_HEIGHT = 250
- TEXT_HEIGHT = 80
- GRID_COLS = 5
- PADDING = 12
- BG_COLOR = (255, 255, 255)
- TEXT_COLOR = (30, 30, 30)
- INDEX_COLOR = (220, 60, 60)
- # 视频处理相关配置
- VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
- VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
- # ── 辅助函数 ──
- def _truncate_text(text: str, max_len: int = 14) -> str:
- """截断文本,超出部分用省略号"""
- return text[:max_len] + "..." if len(text) > max_len else text
- async def _download_image(client: httpx.AsyncClient, url: str) -> Optional[Image.Image]:
- """下载单张图片,失败返回 None"""
- try:
- resp = await client.get(url, timeout=15.0)
- resp.raise_for_status()
- return Image.open(io.BytesIO(resp.content)).convert("RGB")
- except Exception:
- return None
- async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
- """
- 将视频缩略图+序号+标题拼接成网格图,返回 base64 编码的 PNG。
- """
- if not videos:
- return None
- items = []
- for idx, video in enumerate(videos):
- thumbnail = None
- if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
- thumbnail = video["thumbnails"][0].get("url")
- elif "thumbnail" in video:
- thumbnail = video.get("thumbnail")
- elif "cover_url" in video:
- thumbnail = video.get("cover_url")
- title = video.get("title", "") or video.get("text", "")
- if thumbnail:
- items.append({"url": thumbnail, "title": title, "index": idx + 1})
- if not items:
- return None
- async with httpx.AsyncClient() as client:
- tasks = [_download_image(client, item["url"]) for item in items]
- downloaded = await asyncio.gather(*tasks)
- valid = [(item, img) for item, img in zip(items, downloaded) if img is not None]
- if not valid:
- return None
- cols = min(GRID_COLS, len(valid))
- rows = math.ceil(len(valid) / cols)
- cell_w = THUMB_WIDTH + PADDING
- cell_h = THUMB_HEIGHT + TEXT_HEIGHT + PADDING
- canvas_w = cols * cell_w + PADDING
- canvas_h = rows * cell_h + PADDING
- canvas = Image.new("RGB", (canvas_w, canvas_h), BG_COLOR)
- draw = ImageDraw.Draw(canvas)
- font_title = None
- font_index = None
- font_candidates = [
- "msyh.ttc", "simhei.ttf", "simsun.ttc",
- "/System/Library/Fonts/PingFang.ttc",
- "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
- "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
- "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
- ]
- for font_path in font_candidates:
- try:
- font_title = ImageFont.truetype(font_path, 16)
- font_index = ImageFont.truetype(font_path, 32)
- break
- except Exception:
- continue
- if not font_title:
- font_title = ImageFont.load_default()
- font_index = font_title
- for item, img in valid:
- idx = item["index"]
- col = (idx - 1) % cols
- row = (idx - 1) // cols
- x = PADDING + col * cell_w
- y = PADDING + row * cell_h
- scale = min(THUMB_WIDTH / img.width, THUMB_HEIGHT / img.height)
- new_w = int(img.width * scale)
- new_h = int(img.height * scale)
- thumb = img.resize((new_w, new_h), Image.LANCZOS)
- offset_x = x + (THUMB_WIDTH - new_w) // 2
- offset_y = y + (THUMB_HEIGHT - new_h) // 2
- canvas.paste(thumb, (offset_x, offset_y))
- index_text = str(idx)
- idx_x = offset_x
- idx_y = offset_y + 4
- box_size = 52
- draw.rectangle([idx_x, idx_y, idx_x + box_size, idx_y + box_size], fill=INDEX_COLOR)
- bbox = draw.textbbox((0, 0), index_text, font=font_index)
- tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
- text_x = idx_x + (box_size - tw) // 2
- text_y = idx_y + (box_size - th) // 2
- draw.text((text_x, text_y), index_text, fill=(255, 255, 255), font=font_index)
- title = item["title"] or ""
- if title:
- words = list(title)
- lines = []
- current_line = ""
- for ch in words:
- test_line = current_line + ch
- bbox_line = draw.textbbox((0, 0), test_line, font=font_title)
- if bbox_line[2] - bbox_line[0] > THUMB_WIDTH:
- if current_line:
- lines.append(current_line)
- current_line = ch
- else:
- current_line = test_line
- if current_line:
- lines.append(current_line)
- for line_i, line in enumerate(lines):
- draw.text((x, y + THUMB_HEIGHT + 6 + line_i * 22), line, fill=TEXT_COLOR, font=font_title)
- buf = io.BytesIO()
- canvas.save(buf, format="PNG")
- return base64.b64encode(buf.getvalue()).decode("utf-8")
- def _parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
- """解析 SRT 字幕,生成带时间戳的大纲"""
- if not srt_content:
- return []
- outline = []
- blocks = srt_content.strip().split('\n\n')
- for block in blocks:
- lines = block.strip().split('\n')
- if len(lines) >= 3:
- timestamp_line = lines[1]
- if '-->' in timestamp_line:
- start_time = timestamp_line.split('-->')[0].strip()
- text = ' '.join(lines[2:])
- outline.append({'timestamp': start_time, 'text': text})
- return outline
- def _download_youtube_video(video_id: str) -> Optional[str]:
- """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
- try:
- output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
- if output_path.exists():
- return str(output_path)
- cmd = [
- 'yt-dlp',
- '-f', 'best[ext=mp4]',
- '-o', str(output_path),
- f'https://www.youtube.com/watch?v={video_id}'
- ]
- result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
- if result.returncode == 0 and output_path.exists():
- return str(output_path)
- return None
- except Exception:
- return None
- # ── YouTube 工具 ──
- @tool()
- async def youtube_search(keyword: str) -> ToolResult:
- """
- 搜索 YouTube 视频
- Args:
- keyword: 搜索关键词
- Returns:
- 搜索结果列表,包含视频标题、ID、频道等信息
- """
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.post(
- f"{CRAWLER_BASE_URL}/youtube/keyword",
- json={"keyword": keyword}
- )
- response.raise_for_status()
- data = response.json()
- if data.get("code") == 0:
- result_data = data.get("data", {})
- videos = result_data.get("data", []) if isinstance(result_data, dict) else []
- images = []
- collage_b64 = await _build_video_collage(videos)
- if collage_b64:
- images.append({
- "type": "base64",
- "media_type": "image/png",
- "data": collage_b64
- })
- summary_list = []
- for idx, video in enumerate(videos[:20], 1):
- title = video.get("title", "")
- author = video.get("author", "")
- video_id = video.get("video_id", "")
- summary_list.append(f"{idx}. {title} - {author} (ID: {video_id})")
- output_data = {
- "keyword": keyword,
- "total": len(videos),
- "summary": summary_list,
- "data": videos
- }
- return ToolResult(
- title=f"YouTube 搜索: {keyword}",
- output=json.dumps(output_data, ensure_ascii=False, indent=2),
- long_term_memory=f"Searched YouTube for '{keyword}', found {len(videos)} videos",
- images=images
- )
- else:
- return ToolResult(
- title="YouTube 搜索失败",
- output="",
- error=f"搜索失败: {data.get('msg', '未知错误')}"
- )
- except Exception as e:
- return ToolResult(
- title="YouTube 搜索异常",
- output="",
- error=str(e)
- )
- @tool()
- async def youtube_detail(
- content_id: str,
- include_captions: bool = True,
- download_video: bool = False
- ) -> ToolResult:
- """
- 获取 YouTube 视频详情(可选包含字幕、下载视频并生成大纲)
- Args:
- content_id: 视频 ID
- include_captions: 是否包含字幕,默认 True
- download_video: 是否下载视频并生成带时间戳的大纲,默认 False。
- 下载后可使用 extract_video_clip 截取视频片段观看。
- Returns:
- 视频详细信息,包含字幕、视频大纲和本地文件路径
- """
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- detail_response = await client.post(
- f"{CRAWLER_BASE_URL}/youtube/detail",
- json={"content_id": content_id}
- )
- detail_response.raise_for_status()
- detail_data = detail_response.json()
- if detail_data.get("code") != 0:
- return ToolResult(
- title="获取详情失败",
- output="",
- error=f"获取详情失败: {detail_data.get('msg', '未知错误')}"
- )
- result_data = detail_data.get("data", {})
- video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
- # 获取字幕
- captions_text = None
- if include_captions or download_video:
- try:
- captions_response = await client.post(
- f"{CRAWLER_BASE_URL}/youtube/captions",
- json={"content_id": content_id}
- )
- captions_response.raise_for_status()
- captions_data = captions_response.json()
- if captions_data.get("code") == 0:
- captions_result = captions_data.get("data", {})
- if isinstance(captions_result, dict):
- inner_data = captions_result.get("data", {})
- if isinstance(inner_data, dict):
- captions_text = inner_data.get("content")
- except Exception:
- pass
- # 下载视频并生成大纲
- video_path = None
- video_outline = None
- if download_video:
- video_path = await asyncio.to_thread(_download_youtube_video, content_id)
- if captions_text:
- video_outline = _parse_srt_to_outline(captions_text)
- # 合并数据
- output_data = {
- "video_id": content_id,
- "title": video_info.get("title", ""),
- "channel": video_info.get("channel_account_name", ""),
- "description": video_info.get("body_text", ""),
- "like_count": video_info.get("like_count"),
- "comment_count": video_info.get("comment_count"),
- "publish_timestamp": video_info.get("publish_timestamp"),
- "content_link": video_info.get("content_link", ""),
- "captions": captions_text,
- "full_data": video_info
- }
- if download_video:
- output_data["video_path"] = video_path
- output_data["video_outline"] = video_outline
- if not video_path:
- output_data["download_error"] = "视频下载失败,请检查 yt-dlp 是否可用"
- memory = f"Retrieved YouTube video details for {content_id}"
- if captions_text:
- memory += " with captions"
- if video_path:
- memory += f", downloaded to {video_path}"
- return ToolResult(
- title=f"YouTube 视频详情: {content_id}",
- output=json.dumps(output_data, ensure_ascii=False, indent=2),
- long_term_memory=memory
- )
- except Exception as e:
- return ToolResult(
- title="YouTube 详情查询异常",
- output="",
- error=str(e)
- )
- # ── X (Twitter) 工具 ──
- @tool()
- async def x_search(keyword: str) -> ToolResult:
- """
- 搜索 X (Twitter) 内容(数据已结构化,无需访问详情页)
- Args:
- keyword: 搜索关键词
- Returns:
- 搜索结果列表,包含推文内容、作者、互动数据等
- """
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.post(
- "http://crawler.aiddit.com/crawler/x/keyword",
- json={"keyword": keyword}
- )
- response.raise_for_status()
- data = response.json()
- if data.get("code") == 0:
- result_data = data.get("data", {})
- tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
- # 构建拼接图
- images = []
- tweets_with_images = []
- for tweet in tweets:
- image_list = tweet.get("image_url_list", [])
- if image_list:
- tweet["thumbnails"] = [{"url": image_list[0].get("image_url")}]
- tweets_with_images.append(tweet)
- collage_b64 = await _build_video_collage(tweets_with_images if tweets_with_images else tweets)
- if collage_b64:
- images.append({
- "type": "base64",
- "media_type": "image/png",
- "data": collage_b64
- })
- summary_list = []
- for idx, tweet in enumerate(tweets[:20], 1):
- text = tweet.get("body_text", "")[:100]
- author = tweet.get("channel_account_name", "")
- summary_list.append(f"{idx}. @{author}: {text}")
- output_data = {
- "keyword": keyword,
- "total": len(tweets),
- "summary": summary_list,
- "data": tweets
- }
- return ToolResult(
- title=f"X 搜索: {keyword}",
- output=json.dumps(output_data, ensure_ascii=False, indent=2),
- long_term_memory=f"Searched X (Twitter) for '{keyword}', found {len(tweets)} tweets",
- images=images
- )
- else:
- return ToolResult(
- title="X 搜索失败",
- output="",
- error=f"搜索失败: {data.get('msg', '未知错误')}"
- )
- except Exception as e:
- return ToolResult(
- title="X 搜索异常",
- output="",
- error=str(e)
- )
- # ── 内容导入工具 ──
- @tool()
- async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult:
- """
- 导入长文内容(微信公众号、小红书、抖音等通用链接)
- Args:
- plan_name: 计划名称
- content_data: 内容数据列表,每项包含 channel、content_link、title 等字段
- Returns:
- 导入结果,包含 plan_id
- """
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.post(
- f"{AIGC_BASE_URL}/weixin/auto_insert",
- json={"plan_name": plan_name, "data": content_data}
- )
- response.raise_for_status()
- data = response.json()
- if data.get("code") == 0:
- result_data = data.get("data", {})
- return ToolResult(
- title=f"内容导入: {plan_name}",
- output=json.dumps(result_data, ensure_ascii=False, indent=2),
- long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'"
- )
- else:
- return ToolResult(
- title="导入失败",
- output="",
- error=f"导入失败: {data.get('msg', '未知错误')}"
- )
- except Exception as e:
- return ToolResult(
- title="内容导入异常",
- output="",
- error=str(e)
- )
- # ── 视频截取工具 ──
- @tool()
- async def extract_video_clip(
- video_id: str,
- start_time: str,
- end_time: str,
- output_name: Optional[str] = None
- ) -> ToolResult:
- """
- 从已下载的 YouTube 视频中截取指定时间段的片段
- Args:
- video_id: YouTube 视频 ID(必须先通过 youtube_detail(download_video=True) 下载)
- start_time: 开始时间,格式: HH:MM:SS 或 MM:SS
- end_time: 结束时间,格式: HH:MM:SS 或 MM:SS
- output_name: 输出文件名(可选)
- Returns:
- 截取的视频片段路径
- Example:
- extract_video_clip("dQw4w9WgXcQ", "00:00:10", "00:00:30")
- """
- try:
- source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
- if not source_video.exists():
- return ToolResult(
- title="视频截取失败",
- output="",
- error="源视频不存在,请先使用 youtube_detail(download_video=True) 下载视频"
- )
- if not output_name:
- output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
- output_path = VIDEO_DOWNLOAD_DIR / output_name
- cmd = [
- 'ffmpeg',
- '-i', str(source_video),
- '-ss', start_time,
- '-to', end_time,
- '-c', 'copy',
- '-y',
- str(output_path)
- ]
- result = await asyncio.to_thread(
- subprocess.run, cmd, capture_output=True, text=True, timeout=60
- )
- if result.returncode == 0 and output_path.exists():
- file_size = output_path.stat().st_size / (1024 * 1024)
- output_data = {
- "video_id": video_id,
- "clip_path": str(output_path),
- "start_time": start_time,
- "end_time": end_time,
- "file_size_mb": round(file_size, 2)
- }
- return ToolResult(
- title=f"视频片段截取成功: {start_time} - {end_time}",
- output=json.dumps(output_data, ensure_ascii=False, indent=2),
- long_term_memory=f"Extracted video clip from {video_id}: {start_time} to {end_time}"
- )
- else:
- return ToolResult(
- title="视频截取失败",
- output="",
- error=f"ffmpeg 执行失败: {result.stderr}"
- )
- except subprocess.TimeoutExpired:
- return ToolResult(
- title="视频截取超时",
- output="",
- error="视频截取超时(60秒)"
- )
- except Exception as e:
- return ToolResult(
- title="视频截取异常",
- output="",
- error=str(e)
- )
|