crawler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. """
  2. 爬虫服务工具模块
  3. 提供 YouTube、X (Twitter) 和微信/通用链接的搜索和详情查询功能。
  4. """
  5. import json
  6. import os
  7. import subprocess
  8. import tempfile
  9. from pathlib import Path
  10. from typing import Optional, List, Dict, Any
  11. import httpx
  12. from agent.tools import tool, ToolResult
  13. from agent.tools.utils.image import build_image_grid, encode_base64, load_images
  14. # API 配置
  15. CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
  16. AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
  17. DEFAULT_TIMEOUT = 60.0
  18. # 视频处理相关配置
  19. VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
  20. VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
  21. async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
  22. """
  23. 将视频缩略图+序号+标题拼接成网格图,返回 base64 编码的 PNG。
  24. 复用 agent.tools.utils.image 中的共享拼图逻辑。
  25. """
  26. if not videos:
  27. return None
  28. urls: List[str] = []
  29. titles: List[str] = []
  30. for video in videos:
  31. thumbnail = None
  32. if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
  33. thumbnail = video["thumbnails"][0].get("url")
  34. elif "thumbnail" in video:
  35. thumbnail = video.get("thumbnail")
  36. elif "cover_url" in video:
  37. thumbnail = video.get("cover_url")
  38. title = video.get("title", "") or video.get("text", "")
  39. if thumbnail:
  40. urls.append(thumbnail)
  41. titles.append(title)
  42. if not urls:
  43. return None
  44. loaded = await load_images(urls)
  45. valid_images = []
  46. valid_labels = []
  47. for (_, img), title in zip(loaded, titles):
  48. if img is not None:
  49. valid_images.append(img)
  50. valid_labels.append(title)
  51. if not valid_images:
  52. return None
  53. grid = build_image_grid(images=valid_images, labels=valid_labels)
  54. b64, _ = encode_base64(grid, format="PNG")
  55. return b64
  56. def _parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
  57. """解析 SRT 字幕,生成带时间戳的大纲"""
  58. if not srt_content:
  59. return []
  60. outline = []
  61. blocks = srt_content.strip().split('\n\n')
  62. for block in blocks:
  63. lines = block.strip().split('\n')
  64. if len(lines) >= 3:
  65. timestamp_line = lines[1]
  66. if '-->' in timestamp_line:
  67. start_time = timestamp_line.split('-->')[0].strip()
  68. text = ' '.join(lines[2:])
  69. outline.append({'timestamp': start_time, 'text': text})
  70. return outline
  71. def _download_youtube_video(video_id: str) -> Optional[str]:
  72. """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
  73. try:
  74. output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
  75. if output_path.exists():
  76. return str(output_path)
  77. cmd = [
  78. 'yt-dlp',
  79. '-f', 'best[ext=mp4]',
  80. '-o', str(output_path),
  81. f'https://www.youtube.com/watch?v={video_id}'
  82. ]
  83. result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
  84. if result.returncode == 0 and output_path.exists():
  85. return str(output_path)
  86. return None
  87. except Exception:
  88. return None
  89. # ── YouTube 工具 ──
  90. @tool()
  91. async def youtube_search(keyword: str) -> ToolResult:
  92. """
  93. 搜索 YouTube 视频
  94. Args:
  95. keyword: 搜索关键词
  96. Returns:
  97. 搜索结果列表,包含视频标题、ID、频道等信息
  98. """
  99. try:
  100. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  101. response = await client.post(
  102. f"{CRAWLER_BASE_URL}/youtube/keyword",
  103. json={"keyword": keyword}
  104. )
  105. response.raise_for_status()
  106. data = response.json()
  107. if data.get("code") == 0:
  108. result_data = data.get("data", {})
  109. videos = result_data.get("data", []) if isinstance(result_data, dict) else []
  110. images = []
  111. collage_b64 = await _build_video_collage(videos)
  112. if collage_b64:
  113. images.append({
  114. "type": "base64",
  115. "media_type": "image/png",
  116. "data": collage_b64
  117. })
  118. summary_list = []
  119. for idx, video in enumerate(videos[:20], 1):
  120. title = video.get("title", "")
  121. author = video.get("author", "")
  122. video_id = video.get("video_id", "")
  123. summary_list.append(f"{idx}. {title} - {author} (ID: {video_id})")
  124. output_data = {
  125. "keyword": keyword,
  126. "total": len(videos),
  127. "summary": summary_list,
  128. "data": videos
  129. }
  130. return ToolResult(
  131. title=f"YouTube 搜索: {keyword}",
  132. output=json.dumps(output_data, ensure_ascii=False, indent=2),
  133. long_term_memory=f"Searched YouTube for '{keyword}', found {len(videos)} videos",
  134. images=images
  135. )
  136. else:
  137. return ToolResult(
  138. title="YouTube 搜索失败",
  139. output="",
  140. error=f"搜索失败: {data.get('msg', '未知错误')}"
  141. )
  142. except Exception as e:
  143. return ToolResult(
  144. title="YouTube 搜索异常",
  145. output="",
  146. error=str(e)
  147. )
  148. @tool()
  149. async def youtube_detail(
  150. content_id: str,
  151. include_captions: bool = True,
  152. download_video: bool = False
  153. ) -> ToolResult:
  154. """
  155. 获取 YouTube 视频详情(可选包含字幕、下载视频并生成大纲)
  156. Args:
  157. content_id: 视频 ID
  158. include_captions: 是否包含字幕,默认 True
  159. download_video: 是否下载视频并生成带时间戳的大纲,默认 False。
  160. 下载后可使用 extract_video_clip 截取视频片段观看。
  161. Returns:
  162. 视频详细信息,包含字幕、视频大纲和本地文件路径
  163. """
  164. try:
  165. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  166. detail_response = await client.post(
  167. f"{CRAWLER_BASE_URL}/youtube/detail",
  168. json={"content_id": content_id}
  169. )
  170. detail_response.raise_for_status()
  171. detail_data = detail_response.json()
  172. if detail_data.get("code") != 0:
  173. return ToolResult(
  174. title="获取详情失败",
  175. output="",
  176. error=f"获取详情失败: {detail_data.get('msg', '未知错误')}"
  177. )
  178. result_data = detail_data.get("data", {})
  179. video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
  180. # 获取字幕
  181. captions_text = None
  182. if include_captions or download_video:
  183. try:
  184. captions_response = await client.post(
  185. f"{CRAWLER_BASE_URL}/youtube/captions",
  186. json={"content_id": content_id}
  187. )
  188. captions_response.raise_for_status()
  189. captions_data = captions_response.json()
  190. if captions_data.get("code") == 0:
  191. captions_result = captions_data.get("data", {})
  192. if isinstance(captions_result, dict):
  193. inner_data = captions_result.get("data", {})
  194. if isinstance(inner_data, dict):
  195. captions_text = inner_data.get("content")
  196. except Exception:
  197. pass
  198. # 下载视频并生成大纲
  199. video_path = None
  200. video_outline = None
  201. if download_video:
  202. video_path = await asyncio.to_thread(_download_youtube_video, content_id)
  203. if captions_text:
  204. video_outline = _parse_srt_to_outline(captions_text)
  205. # 合并数据
  206. output_data = {
  207. "video_id": content_id,
  208. "title": video_info.get("title", ""),
  209. "channel": video_info.get("channel_account_name", ""),
  210. "description": video_info.get("body_text", ""),
  211. "like_count": video_info.get("like_count"),
  212. "comment_count": video_info.get("comment_count"),
  213. "publish_timestamp": video_info.get("publish_timestamp"),
  214. "content_link": video_info.get("content_link", ""),
  215. "captions": captions_text,
  216. "full_data": video_info
  217. }
  218. if download_video:
  219. output_data["video_path"] = video_path
  220. output_data["video_outline"] = video_outline
  221. if not video_path:
  222. output_data["download_error"] = "视频下载失败,请检查 yt-dlp 是否可用"
  223. memory = f"Retrieved YouTube video details for {content_id}"
  224. if captions_text:
  225. memory += " with captions"
  226. if video_path:
  227. memory += f", downloaded to {video_path}"
  228. return ToolResult(
  229. title=f"YouTube 视频详情: {content_id}",
  230. output=json.dumps(output_data, ensure_ascii=False, indent=2),
  231. long_term_memory=memory
  232. )
  233. except Exception as e:
  234. return ToolResult(
  235. title="YouTube 详情查询异常",
  236. output="",
  237. error=str(e)
  238. )
  239. # ── X (Twitter) 工具 ──
  240. @tool()
  241. async def x_search(keyword: str) -> ToolResult:
  242. """
  243. 搜索 X (Twitter) 内容(数据已结构化,无需访问详情页)
  244. Args:
  245. keyword: 搜索关键词
  246. Returns:
  247. 搜索结果列表,包含推文内容、作者、互动数据等
  248. """
  249. try:
  250. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  251. response = await client.post(
  252. "http://crawler.aiddit.com/crawler/x/keyword",
  253. json={"keyword": keyword}
  254. )
  255. response.raise_for_status()
  256. data = response.json()
  257. if data.get("code") == 0:
  258. result_data = data.get("data", {})
  259. tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
  260. # 构建拼接图
  261. images = []
  262. tweets_with_images = []
  263. for tweet in tweets:
  264. image_list = tweet.get("image_url_list", [])
  265. if image_list:
  266. tweet["thumbnails"] = [{"url": image_list[0].get("image_url")}]
  267. tweets_with_images.append(tweet)
  268. collage_b64 = await _build_video_collage(tweets_with_images if tweets_with_images else tweets)
  269. if collage_b64:
  270. images.append({
  271. "type": "base64",
  272. "media_type": "image/png",
  273. "data": collage_b64
  274. })
  275. summary_list = []
  276. for idx, tweet in enumerate(tweets[:20], 1):
  277. text = tweet.get("body_text", "")[:100]
  278. author = tweet.get("channel_account_name", "")
  279. summary_list.append(f"{idx}. @{author}: {text}")
  280. output_data = {
  281. "keyword": keyword,
  282. "total": len(tweets),
  283. "summary": summary_list,
  284. "data": tweets
  285. }
  286. return ToolResult(
  287. title=f"X 搜索: {keyword}",
  288. output=json.dumps(output_data, ensure_ascii=False, indent=2),
  289. long_term_memory=f"Searched X (Twitter) for '{keyword}', found {len(tweets)} tweets",
  290. images=images
  291. )
  292. else:
  293. return ToolResult(
  294. title="X 搜索失败",
  295. output="",
  296. error=f"搜索失败: {data.get('msg', '未知错误')}"
  297. )
  298. except Exception as e:
  299. return ToolResult(
  300. title="X 搜索异常",
  301. output="",
  302. error=str(e)
  303. )
  304. # ── 内容导入工具 ──
  305. @tool()
  306. async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult:
  307. """
  308. 导入长文内容(微信公众号、小红书、抖音等通用链接)
  309. Args:
  310. plan_name: 计划名称
  311. content_data: 内容数据列表,每项包含 channel、content_link、title 等字段
  312. Returns:
  313. 导入结果,包含 plan_id
  314. """
  315. try:
  316. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  317. response = await client.post(
  318. f"{AIGC_BASE_URL}/weixin/auto_insert",
  319. json={"plan_name": plan_name, "data": content_data}
  320. )
  321. response.raise_for_status()
  322. data = response.json()
  323. if data.get("code") == 0:
  324. result_data = data.get("data", {})
  325. return ToolResult(
  326. title=f"内容导入: {plan_name}",
  327. output=json.dumps(result_data, ensure_ascii=False, indent=2),
  328. long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'"
  329. )
  330. else:
  331. return ToolResult(
  332. title="导入失败",
  333. output="",
  334. error=f"导入失败: {data.get('msg', '未知错误')}"
  335. )
  336. except Exception as e:
  337. return ToolResult(
  338. title="内容导入异常",
  339. output="",
  340. error=str(e)
  341. )
  342. # ── 视频截取工具 ──
  343. @tool()
  344. async def extract_video_clip(
  345. video_id: str,
  346. start_time: str,
  347. end_time: str,
  348. output_name: Optional[str] = None
  349. ) -> ToolResult:
  350. """
  351. 从已下载的 YouTube 视频中截取指定时间段的片段
  352. Args:
  353. video_id: YouTube 视频 ID(必须先通过 youtube_detail(download_video=True) 下载)
  354. start_time: 开始时间,格式: HH:MM:SS 或 MM:SS
  355. end_time: 结束时间,格式: HH:MM:SS 或 MM:SS
  356. output_name: 输出文件名(可选)
  357. Returns:
  358. 截取的视频片段路径
  359. Example:
  360. extract_video_clip("dQw4w9WgXcQ", "00:00:10", "00:00:30")
  361. """
  362. try:
  363. source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
  364. if not source_video.exists():
  365. return ToolResult(
  366. title="视频截取失败",
  367. output="",
  368. error="源视频不存在,请先使用 youtube_detail(download_video=True) 下载视频"
  369. )
  370. if not output_name:
  371. output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
  372. output_path = VIDEO_DOWNLOAD_DIR / output_name
  373. cmd = [
  374. 'ffmpeg',
  375. '-i', str(source_video),
  376. '-ss', start_time,
  377. '-to', end_time,
  378. '-c', 'copy',
  379. '-y',
  380. str(output_path)
  381. ]
  382. result = await asyncio.to_thread(
  383. subprocess.run, cmd, capture_output=True, text=True, timeout=60
  384. )
  385. if result.returncode == 0 and output_path.exists():
  386. file_size = output_path.stat().st_size / (1024 * 1024)
  387. output_data = {
  388. "video_id": video_id,
  389. "clip_path": str(output_path),
  390. "start_time": start_time,
  391. "end_time": end_time,
  392. "file_size_mb": round(file_size, 2)
  393. }
  394. return ToolResult(
  395. title=f"视频片段截取成功: {start_time} - {end_time}",
  396. output=json.dumps(output_data, ensure_ascii=False, indent=2),
  397. long_term_memory=f"Extracted video clip from {video_id}: {start_time} to {end_time}"
  398. )
  399. else:
  400. return ToolResult(
  401. title="视频截取失败",
  402. output="",
  403. error=f"ffmpeg 执行失败: {result.stderr}"
  404. )
  405. except subprocess.TimeoutExpired:
  406. return ToolResult(
  407. title="视频截取超时",
  408. output="",
  409. error="视频截取超时(60秒)"
  410. )
  411. except Exception as e:
  412. return ToolResult(
  413. title="视频截取异常",
  414. output="",
  415. error=str(e)
  416. )