""" X (Twitter) 平台实现 后端:crawler.aiddit.com/crawler/x """ import json from typing import Any, Dict, List, Optional import httpx from agent.tools.models import ToolResult from agent.tools.utils.image import build_image_grid, encode_base64, load_images from agent.tools.builtin.content.registry import PlatformDef, register_platform CRAWLER_URL = "http://crawler.aiddit.com/crawler/x/keyword" DEFAULT_TIMEOUT = 60.0 async def search( platform_id: str, keyword: str, max_count: int = 20, cursor: str = "", extras: Optional[Dict[str, Any]] = None, ) -> ToolResult: try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: response = await client.post(CRAWLER_URL, json={"keyword": keyword}) response.raise_for_status() data = response.json() if data.get("code") != 0: return ToolResult(title="X 搜索失败", output="", error=data.get("msg", "未知错误")) result_data = data.get("data", {}) tweets = result_data.get("data", []) if isinstance(result_data, dict) else [] # 动态导入评价模块 try: from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator evaluator = SourceQualityEvaluator() except ImportError: evaluator = None summary_list = [] for idx, tweet in enumerate(tweets[:max_count], 1): text = tweet.get("body_text", "") score_info = {} if evaluator: try: eval_res = evaluator.evaluate_post(tweet) score_info = { "quality_score": eval_res["total_score"], "quality_grade": eval_res["grade"] } tweet["_quality_score"] = eval_res["total_score"] except Exception: pass summary_item = { "index": idx, "author": tweet.get("channel_account_name", ""), "body_text": text[:100] + ("..." if len(text) > 100 else ""), "like_count": tweet.get("like_count"), "comment_count": tweet.get("comment_count"), "link": tweet.get("link"), } summary_item.update(score_info) summary_list.append(summary_item) # 拼图 images = [] collage_obj = await _build_tweet_collage(tweets[:max_count]) if collage_obj: images.append(collage_obj) return ToolResult( title=f"X: {keyword}", output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2), long_term_memory=f"Searched X for '{keyword}', {len(tweets)} results.", images=images, metadata={"posts": tweets[:max_count]}, ) except Exception as e: return ToolResult(title="X 搜索异常", output="", error=str(e)) MAX_DETAIL_IMAGES = 10 KEEP_INDIVIDUAL = 8 async def _build_images_collage(urls: List[str]) -> Optional[Dict[str, Any]]: """将一组图片 URL 拼成单张网格图""" if not urls: return None loaded = await load_images(urls) valid_images = [img for (_, img) in loaded if img is not None] if not valid_images: return None grid = build_image_grid(images=valid_images, labels=None) import io buf = io.BytesIO() grid.save(buf, format="PNG") img_bytes = buf.getvalue() try: from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss import hashlib md5_hash = hashlib.md5(img_bytes).hexdigest()[:12] filename = f"x_detail_collage_{md5_hash}.png" cdn_url = await _upload_bytes_to_oss(img_bytes, filename) return {"type": "url", "url": cdn_url} except Exception as e: import logging logging.getLogger(__name__).warning("Failed to upload x detail collage to CDN: %s", e) b64, _ = encode_base64(grid, format="PNG") return {"type": "base64", "media_type": "image/png", "data": b64} async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult: """X 的详情直接从缓存的搜索结果取完整数据""" author = post.get("channel_account_name", "") text = post.get("body_text", "")[:30] img_urls = [] for img_item in post.get("image_url_list", []): url = img_item.get("image_url") if isinstance(img_item, dict) else img_item if url: img_urls.append(url) all_images = [] if len(img_urls) > MAX_DETAIL_IMAGES: for u in img_urls[:KEEP_INDIVIDUAL]: all_images.append({"type": "url", "url": u}) collage = await _build_images_collage(img_urls[KEEP_INDIVIDUAL:]) if collage: all_images.append(collage) else: for u in img_urls: all_images.append({"type": "url", "url": u}) output_json = json.dumps(post, ensure_ascii=False, indent=2) output_text = ( output_json + "\n\n---\n请基于以上内容,从信息完整度、内容质量和实用价值三个角度,给出一句简短的内容评价。" ) return ToolResult( title=f"X 详情: @{author}", output=output_text, long_term_memory=f"Viewed X post by @{author}: {text}", images=all_images, ) async def _build_tweet_collage(tweets: List[Dict[str, Any]]) -> Optional[str]: urls, titles = [], [] for tweet in tweets: thumb = None for img_item in tweet.get("image_url_list", []): url = img_item.get("image_url") if isinstance(img_item, dict) else img_item if url: thumb = url break if not thumb: thumb = tweet.get("cover_url") if thumb: urls.append(thumb) base_title = f"@{tweet.get('channel_account_name', '')}" score = tweet.get("_quality_score") if score is not None: title_with_score = f"[{score}分] {base_title}" else: title_with_score = base_title titles.append(title_with_score) if not urls: return None loaded = await load_images(urls) valid_images, valid_labels = [], [] for (_, img), title in zip(loaded, titles): if img is not None: valid_images.append(img) valid_labels.append(title) if not valid_images: return None grid = build_image_grid(images=valid_images, labels=valid_labels) import io buf = io.BytesIO() grid.save(buf, format="PNG") img_bytes = buf.getvalue() try: from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss import hashlib md5_hash = hashlib.md5(img_bytes).hexdigest()[:12] filename = f"x_collage_{md5_hash}.png" cdn_url = await _upload_bytes_to_oss(img_bytes, filename) return {"type": "url", "url": cdn_url} except Exception as e: import logging logging.getLogger(__name__).warning("Failed to upload x collage to CDN: %s", e) b64, _ = encode_base64(grid, format="PNG") return {"type": "base64", "media_type": "image/png", "data": b64} # ── 注册 ── _X = PlatformDef( id="x", name="X (Twitter)", aliases=["twitter", "推特"], ) _X.search_impl = search _X.detail_impl = detail register_platform(_X)