from typing import Optional, Dict, List from app.infra.shared import AsyncHttpClient class DecodeServer: base_url: str = "http://supply-content-deconstruction-api.piaoquantv.com" # 创建解构任务 async def create_decode_task(self, data: Dict) -> Dict: """ scene: 业务场景:0 选题; 1 创作; 2 制作; content_type: 内容类型: 1 长文; 2 图文; 3 视频; content: channel_content_id: video_url: images: [ image_url_1, image_url_2, ...] body_text: 长文内容 title: 文章标题 channel_account_id: 作者 id channel_account_name: 作者名称 output_type: { } """ url = f"{self.base_url}/api/v1/content/tasks/decode" headers = { "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0", "Content-Type": "application/json", } async with AsyncHttpClient() as client: response = await client.post(url, json=data, headers=headers) return response # 获取解构结果 async def fetch_result(self, task_id: str) -> Dict: """ INPUT: TaskId OUTPUT: Dict { code: 0 | 404(task not exist), msg: 'ok', data: { "taskId": "123", "status": 3, 0 PENDING, 1 RUNNING, 2 SUCCESS, 3 FAILED "result": None | JSON, "reason": "" | 失败原因, "url": { "pointUrl": "", 选题点结构结果页地址(仅解构任务有 "weightUrl": "", 权重页地址(解构聚类都有) "patternUrl": "" 选题点聚类结果页地址(仅聚类任务有) } }, } """ url = f"{self.base_url}/api/v1/content/tasks/{task_id}" headers = { "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0", "Content-Type": "application/json", } async with AsyncHttpClient() as client: response = await client.get(url, headers=headers) return response