瀏覽代碼

2024-06-13
异步定时任务

罗俊辉 1 年之前
父節點
當前提交
5c4a839de9
共有 2 個文件被更改,包括 62 次插入16 次删除
  1. 57 15
      applications/deal/process_deal.py
  2. 5 1
      task.py

+ 57 - 15
applications/deal/process_deal.py

@@ -5,6 +5,7 @@ import asyncio
 
 
 from applications.static.config import db_article
 from applications.static.config import db_article
 from applications.schedule import search_videos
 from applications.schedule import search_videos
+from applications.functions.log import logging
 
 
 
 
 class ProcessDeal(object):
 class ProcessDeal(object):
@@ -31,7 +32,6 @@ class ProcessDeal(object):
             ASC
             ASC
             LIMIT 10;
             LIMIT 10;
         """
         """
-        print(select_sql)
         task_list = await self.mysql_client.async_select(sql=select_sql)
         task_list = await self.mysql_client.async_select(sql=select_sql)
         task_obj_list = [
         task_obj_list = [
             {
             {
@@ -44,6 +44,11 @@ class ProcessDeal(object):
                 "process_times": item[6]
                 "process_times": item[6]
             } for item in task_list
             } for item in task_list
         ]
         ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
         return task_obj_list
         return task_obj_list
 
 
     async def get_history_contents(self, content_id):
     async def get_history_contents(self, content_id):
@@ -115,6 +120,11 @@ class ProcessDeal(object):
         WHERE  trace_id = '{params['trace_id']}'
         WHERE  trace_id = '{params['trace_id']}'
         """
         """
         await self.mysql_client.async_insert(update_sql)
         await self.mysql_client.async_insert(update_sql)
+        logging(
+            code="9002",
+            info="已从历史文章更新,历史id: {}".format(history_trace_id),
+            trace_id=params['trace_id']
+        )
 
 
     async def process_video_id(self, title, trace_id, process_times):
     async def process_video_id(self, title, trace_id, process_times):
         """
         """
@@ -151,13 +161,17 @@ class ProcessDeal(object):
         try:
         try:
             # 判断标题中是否包含video_id
             # 判断标题中是否包含video_id
             if "video_id=" in params['title']:
             if "video_id=" in params['title']:
+                logging(
+                    code="9006",
+                    info="视频生成文本测试",
+                    trace_id=params['trace_id']
+                )
                 await self.process_video_id(
                 await self.process_video_id(
                     title=params['title'],
                     title=params['title'],
                     trace_id=params['trace_id'],
                     trace_id=params['trace_id'],
                     process_times=params['process_times']
                     process_times=params['process_times']
                 )
                 )
             else:
             else:
-                print("开始搜索视频")
                 await search_videos(
                 await search_videos(
                     params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
                     params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
                     trace_id=params['trace_id'],
                     trace_id=params['trace_id'],
@@ -181,9 +195,12 @@ class ProcessDeal(object):
                            WHERE trace_id = '{params["trace_id"]}';
                            WHERE trace_id = '{params["trace_id"]}';
                     """
                     """
                     await self.mysql_client.async_insert(sql=update_sql2)
                     await self.mysql_client.async_insert(sql=update_sql2)
-                    print("搜索视频成功")
+                    logging(
+                        code="9008",
+                        info="视频搜索成功, 状态修改为2",
+                        trace_id=params['trace_id']
+                    )
                 else:
                 else:
-                    print("搜索视频失败")
                     update_sql3 = f"""
                     update_sql3 = f"""
                                         UPDATE {db_article}
                                         UPDATE {db_article}
                                         SET 
                                         SET 
@@ -192,15 +209,24 @@ class ProcessDeal(object):
                                         WHERE trace_id = '{params["trace_id"]}';
                                         WHERE trace_id = '{params["trace_id"]}';
                                     """
                                     """
                     await self.mysql_client.async_insert(sql=update_sql3)
                     await self.mysql_client.async_insert(sql=update_sql3)
+                    logging(
+                        code="9018",
+                        info="视频搜索失败,回退状态为0",
+                        trace_id=params['trace_id']
+                    )
         except Exception as e:
         except Exception as e:
-            print("{}异常错误:{}".format(params['trace_id'], e))
+            logging(
+                code="9018",
+                info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
+                trace_id=params['trace_id']
+            )
             update_sql4 = f"""
             update_sql4 = f"""
-                                                    UPDATE {db_article}
-                                                    SET 
-                                                       content_status = 0,
-                                                       process_times = {int(params['process_times']) + 1}
-                                                    WHERE trace_id = '{params["trace_id"]}';
-                                                """
+                UPDATE {db_article}
+                SET 
+                   content_status = 0,
+                   process_times = {int(params['process_times']) + 1}
+                WHERE trace_id = '{params["trace_id"]}';
+            """
             await self.mysql_client.async_insert(sql=update_sql4)
             await self.mysql_client.async_insert(sql=update_sql4)
 
 
     async def process_task(self, params):
     async def process_task(self, params):
@@ -210,19 +236,32 @@ class ProcessDeal(object):
         :return:
         :return:
         """
         """
         content_id = params['content_id']
         content_id = params['content_id']
+        trace_id = params['trace_id']
         # 判断该文章是否已经生成了
         # 判断该文章是否已经生成了
         history_trace_id = await self.get_history_contents(content_id)
         history_trace_id = await self.get_history_contents(content_id)
         if history_trace_id:
         if history_trace_id:
             # 说明已经存在了结果, 将该条记录下的video_id拿出来
             # 说明已经存在了结果, 将该条记录下的video_id拿出来
-            print("该文章已经成功请求")
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
             await self.insert_history_contents_videos(history_trace_id, params)
             await self.insert_history_contents_videos(history_trace_id, params)
         else:
         else:
             flag = await self.judge_content_processing(content_id)
             flag = await self.judge_content_processing(content_id)
             if flag:
             if flag:
-                print("开始处理这条视频")
+                logging(
+                    code="9004",
+                    info="无正在处理的文章ID, 开始处理",
+                    trace_id=trace_id
+                )
                 await self.start_process(params=params)
                 await self.start_process(params=params)
             else:
             else:
-                print("该文章id正在处理中")
+                logging(
+                    code="9003",
+                    info="该文章ID正在请求--文章ID {}".format(content_id),
+                    trace_id=trace_id
+                )
 
 
     async def deal(self):
     async def deal(self):
         """
         """
@@ -234,4 +273,7 @@ class ProcessDeal(object):
             tasks = [self.process_task(params) for params in task_list]
             tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)
             await asyncio.gather(*tasks)
         else:
         else:
-            print("没有要处理的视频")
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 5 - 1
task.py

@@ -1,6 +1,7 @@
 """
 """
 @author: luojunhui
 @author: luojunhui
 """
 """
+import time
 import asyncio
 import asyncio
 
 
 import aiomysql
 import aiomysql
@@ -76,4 +77,7 @@ async def main():
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    asyncio.run(main())
+    while True:
+        asyncio.run(main())
+        print("请求执行完成, 等待120s")
+        time.sleep(120)