| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- from typing import Optional, Dict, List
- from app.infra.shared import AsyncHttpClient
- class DecodeServer:
- base_url: str = "http://supply-content-deconstruction-api.piaoquantv.com"
- # 创建解构任务
- async def create_decode_task(self, data: Dict) -> Dict:
- """
- scene: 业务场景:0 选题; 1 创作; 2 制作;
- content_type: 内容类型: 1 长文; 2 图文; 3 视频;
- content:
- channel_content_id:
- video_url:
- images: [ image_url_1, image_url_2, ...]
- body_text: 长文内容
- title: 文章标题
- channel_account_id: 作者 id
- channel_account_name: 作者名称
- output_type: {
- }
- """
- url = f"{self.base_url}/api/v1/content/tasks/decode"
- headers = {
- "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
- "Content-Type": "application/json",
- }
- async with AsyncHttpClient() as client:
- response = await client.post(url, json=data, headers=headers)
- return response
- # 获取解构结果
- async def fetch_result(self, task_id: str) -> Dict:
- """
- INPUT: TaskId
- OUTPUT: Dict
- {
- code: 0 | 404(task not exist),
- msg: 'ok',
- data: {
- "taskId": "123",
- "status": 3, 0 PENDING, 1 RUNNING, 2 SUCCESS, 3 FAILED
- "result": None | JSON,
- "reason": "" | 失败原因,
- "url": {
- "pointUrl": "", 选题点结构结果页地址(仅解构任务有
- "weightUrl": "", 权重页地址(解构聚类都有)
- "patternUrl": "" 选题点聚类结果页地址(仅聚类任务有)
- }
- },
- }
- """
- url = f"{self.base_url}/api/v1/content/tasks/{task_id}"
- headers = {
- "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
- "Content-Type": "application/json",
- }
- async with AsyncHttpClient() as client:
- response = await client.get(url, headers=headers)
- return response
|