""" 热点宝画像数据工具(示例) 调用内部爬虫服务获取账号/内容的粉丝画像。 """ import asyncio import json from typing import Optional import httpx from agent.tools import tool, ToolResult ACCOUNT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/account_fans_portrait" CONTENT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/video_like_portrait" DEFAULT_TIMEOUT = 60.0 @tool(description="获取抖音账号粉丝画像(热点宝),支持选择画像维度") async def get_account_fans_portrait( account_id: str, need_province: bool = False, need_city: bool = False, need_city_level: bool = False, need_gender: bool = False, need_age: bool = True, need_phone_brand: bool = False, need_phone_price: bool = False, timeout: Optional[float] = None, ) -> ToolResult: """ 获取抖音账号粉丝画像 """ try: payload = { "account_id": account_id, "need_province": need_province, "need_city": need_city, "need_city_level": need_city_level, "need_gender": need_gender, "need_age": need_age, "need_phone_brand": need_phone_brand, "need_phone_price": need_phone_price, } request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT async with httpx.AsyncClient(timeout=request_timeout) as client: response = await client.post( ACCOUNT_FANS_PORTRAIT_API, json=payload, headers={"Content-Type": "application/json"}, ) response.raise_for_status() data = response.json() return ToolResult( title=f"账号粉丝画像: {account_id}", output=json.dumps(data, ensure_ascii=False, indent=2), long_term_memory=f"Fetched fans portrait for account '{account_id}'", ) except httpx.HTTPStatusError as e: return ToolResult( title="账号粉丝画像获取失败", output="", error=f"HTTP error {e.response.status_code}: {e.response.text}", ) except Exception as e: return ToolResult( title="账号粉丝画像获取失败", output="", error=str(e), ) @tool(description="获取抖音内容粉丝画像(热点宝),支持选择画像维度") async def get_content_fans_portrait( content_id: str, need_province: bool = False, need_city: bool = False, need_city_level: bool = False, need_gender: bool = False, need_age: bool = True, need_phone_brand: bool = False, need_phone_price: bool = False, timeout: Optional[float] = None, ) -> ToolResult: """ 获取抖音内容粉丝画像 """ try: payload = { "content_id": content_id, "need_province": need_province, "need_city": need_city, "need_city_level": need_city_level, "need_gender": need_gender, "need_age": need_age, "need_phone_brand": need_phone_brand, "need_phone_price": need_phone_price, } request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT async with httpx.AsyncClient(timeout=request_timeout) as client: response = await client.post( CONTENT_FANS_PORTRAIT_API, json=payload, headers={"Content-Type": "application/json"}, ) response.raise_for_status() data = response.json() return ToolResult( title=f"内容粉丝画像: {content_id}", output=json.dumps(data, ensure_ascii=False, indent=2), long_term_memory=f"Fetched fans portrait for content '{content_id}'", ) except httpx.HTTPStatusError as e: return ToolResult( title="内容粉丝画像获取失败", output="", error=f"HTTP error {e.response.status_code}: {e.response.text}", ) except Exception as e: return ToolResult( title="内容粉丝画像获取失败", output="", error=str(e), ) # async def main(): # # result = await get_account_fans_portrait( # # account_id="MS4wLjABAAAAXvRdWJsdPKkh9Ja3ZirxoB8pAaxNXUXs1KUe14gW0IoqDz-D-fG0xZ8c5kSfTPXx" # # ) # # print(result.output) # result = await get_content_fans_portrait( # content_id="7495776724350405928" # ) # print(result.output) # # if __name__ == "__main__": # asyncio.run(main())