Browse Source

多线程启动historyTask

luojunhui 5 months ago
parent
commit
94084516ab
3 changed files with 44 additions and 12 deletions
  1. 5 1
      applications/config/const.py
  2. 33 7
      historyTask.py
  3. 6 4
      tasks/history_task.py

+ 5 - 1
applications/config/const.py

@@ -36,6 +36,10 @@ class HistoryContentIdTaskConst:
     UP_LEVEL_STATUS = 1
     TITLE_EXIT_STATUS = -1
 
+    # 等待时间
+    NEED_PUBLISH_WAIT_TIME = 60
+    DO_NOT_NEED_PUBLISH_WAIT_TIME = 10
+
 
 class NewContentIdTaskConst(HistoryContentIdTaskConst):
     """
@@ -65,4 +69,4 @@ class NewContentIdTaskConst(HistoryContentIdTaskConst):
     KIMI_FAIL_STATUS = 2
 
     # 视频下载失败状态
-    VIDEO_DOWNLOAD_FAIL_STATUS = 3
+    VIDEO_DOWNLOAD_FAIL_STATUS = 3

+ 33 - 7
historyTask.py

@@ -6,13 +6,16 @@ import asyncio
 import datetime
 import traceback
 
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
 from tasks.history_task import historyContentIdTask
 from applications.db import AsyncMySQLClient
 from applications.log import logging
 from applications.feishu import bot
+from applications.config.const import HistoryContentIdTaskConst
 
 
-async def main():
+async def main(publish_flag):
     """
     main job
     :return:
@@ -58,16 +61,39 @@ async def main():
             }
         )
         return
-    await history_content_id_task.deal()
+    await history_content_id_task.deal(publish_flag)
 
 
-if __name__ == '__main__':
+def run_thread(publish_flag, sleep_seconds):
+    """
+    执行进程
+    """
     while True:
-        asyncio.run(main())
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+        loop.run_until_complete(main(publish_flag=publish_flag))
+        loop.close()
         now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
         logging(
             code="history0004",
-            info="History task finished"
+            info="History task finished",
+            alg="run_thread2"
         )
-        time.sleep(60)
+        print("{}    请求执行完成, 等待{}s".format(now_str, sleep_seconds))
+        time.sleep(sleep_seconds)
+
+
+if __name__ == '__main__':
+    const = HistoryContentIdTaskConst()
+
+    with ThreadPoolExecutor(max_workers=2) as executor:
+        futures = [
+            executor.submit(run_thread, const.NEED_PUBLISH, const.NEED_PUBLISH_WAIT_TIME),
+            executor.submit(run_thread, const.DO_NOT_NEED_PUBLISH, const.DO_NOT_NEED_PUBLISH_WAIT_TIME)
+        ]
+
+        try:
+            for future in as_completed(futures):
+                future.result()
+        except KeyboardInterrupt:
+            print("Stopping all threads...")

+ 6 - 4
tasks/history_task.py

@@ -33,7 +33,7 @@ class historyContentIdTask(object):
         self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
         self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
 
-    async def get_tasks(self):
+    async def get_tasks(self, publish_flag):
         """
         获取任务
         :return:
@@ -54,9 +54,11 @@ class historyContentIdTask(object):
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
             WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
+                AND publish_flag = {publish_flag}
             ORDER BY ART.flow_pool_level, ART.request_timestamp
             LIMIT {self.history_coroutines};
         """
+        print(select_sql1)
         tasks = await self.mysql_client.async_select(sql=select_sql1)
         task_obj_list = [
             {
@@ -422,16 +424,16 @@ class historyContentIdTask(object):
         else:
             return
 
-    async def deal(self):
+    async def deal(self, publish_flag):
         """
         处理
         :return:
         """
-        task_list = await self.get_tasks()
+        task_list = await self.get_tasks(publish_flag=publish_flag)
         logging(
             code="history1001",
             info="History content_task Task Got {} this time".format(len(task_list)),
-            function="History Contents Task"
+            function="History Contents Task {}".format(publish_flag)
         )
         if task_list:
             a = time.time()