Sfoglia il codice sorgente

Merge branch '2024-11-14-luojunhui-request-aigc-system' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 11 mesi fa
parent
commit
d93cdb0592
3 ha cambiato i file con 52 aggiunte e 0 eliminazioni
  1. 37 0
      applications/functions/aigc.py
  2. 9 0
      tasks/history_task.py
  3. 6 0
      tasks/newContentIdTask.py

+ 37 - 0
applications/functions/aigc.py

@@ -0,0 +1,37 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+import aiohttp
+
+TIMEOUT_CODE = 0
+
+
+async def async_record(url, params) -> int:
+    """
+    异步请求函数
+    """
+    try:
+        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
+            async with session.post(url, json=params) as response:
+                return response.status
+    except asyncio.TimeoutError:
+        return TIMEOUT_CODE
+
+
+async def record_trace_id(trace_id, status) -> bool:
+    """
+    记录成功的trace_id
+    """
+    OK_STATUS = 200
+    RETRY_TIMES = 3
+    url = "http://aigc-api.cybertogether.net//aigc/publish/api/notifyMatchMiniprogramResult"
+    params = {"traceId": trace_id, "status": status}
+    response_status_code = await async_record(url, params)
+    for _ in range(RETRY_TIMES):
+        response_status_code = await async_record(url, params)
+        if response_status_code == OK_STATUS:
+            return True
+        await asyncio.sleep(1)
+    return False

+ 9 - 0
tasks/history_task.py

@@ -11,6 +11,7 @@ from applications.config import Config
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
+from applications.functions.aigc import record_trace_id
 
 
 class historyContentIdTask(object):
@@ -22,6 +23,8 @@ class historyContentIdTask(object):
     MISMATCH_STATUS = 96
     TASK_INIT_STATUS = 0
     TASK_PUBLISHED_STATUS = 4
+    RECORD_SUCCESS_TRACE_ID_CODE = 2
+    RECORD_FAIL_TRACE_ID_CODE = 3
 
     def __init__(self, mysql_client):
         """
@@ -218,6 +221,7 @@ class historyContentIdTask(object):
             trace_id=trace_id,
             data=L
         )
+        await record_trace_id(trace_id=trace_id, status=self.RECORD_SUCCESS_TRACE_ID_CODE)
 
     async def roll_back_content_status_when_fails(self, process_times, trace_id):
         """
@@ -334,6 +338,8 @@ class historyContentIdTask(object):
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
+                await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
+                return
             # 校验文章是否晋升 or 退场
             exit_status = await self.check_title_whether_exit(content_id)
             if exit_status:
@@ -356,6 +362,9 @@ class historyContentIdTask(object):
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
+                await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
+                return
+
         gh_id = params['gh_id']
         process_times = params['process_times']
         download_videos = await self.get_video_list(content_id=content_id)

+ 6 - 0
tasks/newContentIdTask.py

@@ -14,6 +14,7 @@ from applications.functions.kimi import KimiServer
 from applications.spider import search_videos_from_web
 from applications.etl_function import *
 from applications.feishu import bot
+from applications.functions.aigc import record_trace_id
 
 
 class NewContentIdTask(object):
@@ -30,6 +31,7 @@ class NewContentIdTask(object):
     KIMI_ILLEGAL_STATUS = 95
     ARTICLE_TEXT_TABLE_ERROR = 98
     TASK_MAX_PROCESS_TIMES = 3
+    RECORD_SUCCESS_TRACE_ID_CODE = 2
 
     def __init__(self, mysql_client):
         self.mysql_client = mysql_client
@@ -714,6 +716,10 @@ class NewContentIdTask(object):
                             info="publish_success",
                             trace_id=trace_id
                         )
+                        await record_trace_id(
+                            trace_id=trace_id,
+                            status=self.RECORD_SUCCESS_TRACE_ID_CODE
+                        )
                     except Exception as e:
                         logging(
                             code="6004",