Преглед изворни кода

2024-06-13
异步定时任务

罗俊辉 пре 11 месеци
родитељ
комит
52dab77670
1 измењених фајлова са 17 додато и 7 уклоњено
  1. 17 7
      applications/deal/process_deal.py

+ 17 - 7
applications/deal/process_deal.py

@@ -24,13 +24,22 @@ class ProcessDeal(object):
         获取任务
         获取任务
         :return:
         :return:
         """
         """
+        select_sql1 = f"""
+            SELECT DISTINCT (content_id)       
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5
+            ORDER BY request_time_stamp
+            ASC
+            LIMIT 4;
+        """
+        content_ids = await self.mysql_client.async_select(select_sql1)
+        content_ids_tuple = tuple([i[0] for i in content_ids])
         select_sql = f"""
         select_sql = f"""
             SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
             SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
             FROM {db_article} 
             FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 5
+            WHERE content_id in {content_ids_tuple}
             ORDER BY request_time_stamp
             ORDER BY request_time_stamp
             ASC
             ASC
-            LIMIT 20;
         """
         """
         task_list = await self.mysql_client.async_select(sql=select_sql)
         task_list = await self.mysql_client.async_select(sql=select_sql)
         task_obj_list = [
         task_obj_list = [
@@ -269,6 +278,7 @@ class ProcessDeal(object):
         :return:
         :return:
         """
         """
         task_list = await self.get_task()
         task_list = await self.get_task()
+        # print(task_list)
         task_dict = {}
         task_dict = {}
         for task in task_list:
         for task in task_list:
             key = task['content_id']
             key = task['content_id']
@@ -276,11 +286,11 @@ class ProcessDeal(object):
         process_list = []
         process_list = []
         for item in task_dict:
         for item in task_dict:
             process_list.append(task_dict[item])
             process_list.append(task_dict[item])
-        if task_list:
-            for task in task_list:
-                await self.process_task(task)
-            # tasks = [self.process_task(params) for params in process_list]
-            # await asyncio.gather(*tasks)
+        if process_list:
+            # for task in task_list:
+            #     await self.process_task(task)
+            tasks = [self.process_task(params) for params in process_list]
+            await asyncio.gather(*tasks)
         else:
         else:
             logging(
             logging(
                 code="9008",
                 code="9008",