| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- """
- X (Twitter) 平台实现
- 后端:crawler.aiddit.com/crawler/x
- """
- import json
- from typing import Any, Dict, List, Optional
- import httpx
- from agent.tools.models import ToolResult
- from agent.tools.utils.image import build_image_grid, encode_base64, load_images
- from agent.tools.builtin.content.registry import PlatformDef, register_platform
- CRAWLER_URL = "http://crawler.aiddit.com/crawler/x/keyword"
- DEFAULT_TIMEOUT = 60.0
- async def search(
- platform_id: str,
- keyword: str,
- max_count: int = 20,
- cursor: str = "",
- extras: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.post(CRAWLER_URL, json={"keyword": keyword})
- response.raise_for_status()
- data = response.json()
- if data.get("code") != 0:
- return ToolResult(title="X 搜索失败", output="", error=data.get("msg", "未知错误"))
- result_data = data.get("data", {})
- tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
- # 动态导入评价模块
- try:
- from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
- evaluator = SourceQualityEvaluator()
- except ImportError:
- evaluator = None
- summary_list = []
- for idx, tweet in enumerate(tweets[:max_count], 1):
- text = tweet.get("body_text", "")
- score_info = {}
- if evaluator:
- try:
- eval_res = evaluator.evaluate_post(tweet)
- score_info = {
- "quality_score": eval_res["total_score"],
- "quality_grade": eval_res["grade"]
- }
- tweet["_quality_score"] = eval_res["total_score"]
- except Exception:
- pass
- summary_item = {
- "index": idx,
- "author": tweet.get("channel_account_name", ""),
- "body_text": text[:100] + ("..." if len(text) > 100 else ""),
- "like_count": tweet.get("like_count"),
- "comment_count": tweet.get("comment_count"),
- "link": tweet.get("link"),
- }
- summary_item.update(score_info)
- summary_list.append(summary_item)
- # 拼图
- images = []
- collage_obj = await _build_tweet_collage(tweets[:max_count])
- if collage_obj:
- images.append(collage_obj)
- return ToolResult(
- title=f"X: {keyword}",
- output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
- long_term_memory=f"Searched X for '{keyword}', {len(tweets)} results.",
- images=images,
- metadata={"posts": tweets[:max_count]},
- )
- except Exception as e:
- return ToolResult(title="X 搜索异常", output="", error=str(e))
- MAX_DETAIL_IMAGES = 10
- KEEP_INDIVIDUAL = 8
- async def _build_images_collage(urls: List[str]) -> Optional[Dict[str, Any]]:
- """将一组图片 URL 拼成单张网格图"""
- if not urls:
- return None
- loaded = await load_images(urls)
- valid_images = [img for (_, img) in loaded if img is not None]
- if not valid_images:
- return None
- grid = build_image_grid(images=valid_images, labels=None)
- import io
- buf = io.BytesIO()
- grid.save(buf, format="PNG")
- img_bytes = buf.getvalue()
- try:
- from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
- import hashlib
- md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
- filename = f"x_detail_collage_{md5_hash}.png"
- cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
- return {"type": "url", "url": cdn_url}
- except Exception as e:
- import logging
- logging.getLogger(__name__).warning("Failed to upload x detail collage to CDN: %s", e)
- b64, _ = encode_base64(grid, format="PNG")
- return {"type": "base64", "media_type": "image/png", "data": b64}
- async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
- """X 的详情直接从缓存的搜索结果取完整数据"""
- author = post.get("channel_account_name", "")
- text = post.get("body_text", "")[:30]
- img_urls = []
- for img_item in post.get("image_url_list", []):
- url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
- if url:
- img_urls.append(url)
- all_images = []
- if len(img_urls) > MAX_DETAIL_IMAGES:
- for u in img_urls[:KEEP_INDIVIDUAL]:
- all_images.append({"type": "url", "url": u})
- collage = await _build_images_collage(img_urls[KEEP_INDIVIDUAL:])
- if collage:
- all_images.append(collage)
- else:
- for u in img_urls:
- all_images.append({"type": "url", "url": u})
- output_json = json.dumps(post, ensure_ascii=False, indent=2)
- output_text = (
- output_json
- + "\n\n---\n请基于以上内容,从信息完整度、内容质量和实用价值三个角度,给出一句简短的内容评价。"
- )
- return ToolResult(
- title=f"X 详情: @{author}",
- output=output_text,
- long_term_memory=f"Viewed X post by @{author}: {text}",
- images=all_images,
- )
- async def _build_tweet_collage(tweets: List[Dict[str, Any]]) -> Optional[str]:
- urls, titles = [], []
- for tweet in tweets:
- thumb = None
- for img_item in tweet.get("image_url_list", []):
- url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
- if url:
- thumb = url
- break
- if not thumb:
- thumb = tweet.get("cover_url")
- if thumb:
- urls.append(thumb)
- base_title = f"@{tweet.get('channel_account_name', '')}"
- score = tweet.get("_quality_score")
- if score is not None:
- title_with_score = f"[{score}分] {base_title}"
- else:
- title_with_score = base_title
- titles.append(title_with_score)
- if not urls:
- return None
- loaded = await load_images(urls)
- valid_images, valid_labels = [], []
- for (_, img), title in zip(loaded, titles):
- if img is not None:
- valid_images.append(img)
- valid_labels.append(title)
- if not valid_images:
- return None
- grid = build_image_grid(images=valid_images, labels=valid_labels)
- import io
- buf = io.BytesIO()
- grid.save(buf, format="PNG")
- img_bytes = buf.getvalue()
-
- try:
- from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
- import hashlib
-
- md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
- filename = f"x_collage_{md5_hash}.png"
- cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
- return {"type": "url", "url": cdn_url}
- except Exception as e:
- import logging
- logging.getLogger(__name__).warning("Failed to upload x collage to CDN: %s", e)
- b64, _ = encode_base64(grid, format="PNG")
- return {"type": "base64", "media_type": "image/png", "data": b64}
- # ── 注册 ──
- _X = PlatformDef(
- id="x",
- name="X (Twitter)",
- aliases=["twitter", "推特"],
- )
- _X.search_impl = search
- _X.detail_impl = detail
- register_platform(_X)
|