浏览代码

Merge branch '2024-11-25-luojunhui-new-method-match-improve' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 5 月之前
父节点
当前提交
e567fde7ce
共有 3 个文件被更改,包括 53 次插入11 次删除
  1. 5 1
      applications/config/const.py
  2. 44 8
      historyTask.py
  3. 4 2
      tasks/history_task.py

+ 5 - 1
applications/config/const.py

@@ -36,6 +36,10 @@ class HistoryContentIdTaskConst:
     UP_LEVEL_STATUS = 1
     TITLE_EXIT_STATUS = -1
 
+    # 等待时间
+    NEED_PUBLISH_WAIT_TIME = 60
+    DO_NOT_NEED_PUBLISH_WAIT_TIME = 10
+
 
 class NewContentIdTaskConst(HistoryContentIdTaskConst):
     """
@@ -65,4 +69,4 @@ class NewContentIdTaskConst(HistoryContentIdTaskConst):
     KIMI_FAIL_STATUS = 2
 
     # 视频下载失败状态
-    VIDEO_DOWNLOAD_FAIL_STATUS = 3
+    VIDEO_DOWNLOAD_FAIL_STATUS = 3

+ 44 - 8
historyTask.py

@@ -6,13 +6,17 @@ import asyncio
 import datetime
 import traceback
 
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
 from tasks.history_task import historyContentIdTask
 from applications.db import AsyncMySQLClient
 from applications.log import logging
 from applications.feishu import bot
+from applications.config.const import HistoryContentIdTaskConst
 
 
-async def main():
+async def do_job(publish_flag):
     """
     main job
     :return:
@@ -45,7 +49,7 @@ async def main():
             }
         )
     try:
-        history_content_id_task = historyContentIdTask(async_mysql_pool)
+        history_content_id_task = historyContentIdTask(async_mysql_pool, publish_flag)
     except Exception as e:
         logging(
             code="history0003",
@@ -61,13 +65,45 @@ async def main():
     await history_content_id_task.deal()
 
 
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
+def run_thread(publish_flag, sleep_seconds, stop_flag):
+    """
+    执行进程
+    """
+    while not stop_flag.is_set():
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+        loop.run_until_complete(do_job(publish_flag=publish_flag))
+        loop.close()
         now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
         logging(
             code="history0004",
-            info="History task finished"
+            info="History task finished",
+            function="run_thread"
         )
-        time.sleep(60)
+        print("Task{}--{}    请求执行完成, 等待{}s".format(publish_flag, now_str, sleep_seconds))
+        time.sleep(sleep_seconds)
+
+
+def main():
+    """
+    main function
+    """
+    const = HistoryContentIdTaskConst()
+    stop_event = threading.Event()
+
+    # 启动两个线程,分别执行两个函数
+    with ThreadPoolExecutor(max_workers=2) as executor:
+        futures = [
+            executor.submit(run_thread, const.NEED_PUBLISH, const.NEED_PUBLISH_WAIT_TIME, stop_event),
+            executor.submit(run_thread, const.DO_NOT_NEED_PUBLISH, const.DO_NOT_NEED_PUBLISH_WAIT_TIME, stop_event)
+        ]
+        try:
+            for future in as_completed(futures):
+                future.result()
+        except KeyboardInterrupt:
+            print("Stopping all threads...")
+            stop_event.set()
+
+
+if __name__ == '__main__':
+    main()

+ 4 - 2
tasks/history_task.py

@@ -19,7 +19,7 @@ class historyContentIdTask(object):
     处理已经匹配过小程序的文章
     """
 
-    def __init__(self, mysql_client):
+    def __init__(self, mysql_client, publish_flag):
         """
         :param mysql_client:
         """
@@ -32,6 +32,7 @@ class historyContentIdTask(object):
         self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
         self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
         self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
+        self.publish_flag = publish_flag
 
     async def get_tasks(self):
         """
@@ -54,6 +55,7 @@ class historyContentIdTask(object):
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
             WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
+                AND ART.publish_flag = {self.publish_flag}
             ORDER BY ART.flow_pool_level, ART.request_timestamp
             LIMIT {self.history_coroutines};
         """
@@ -431,7 +433,7 @@ class historyContentIdTask(object):
         logging(
             code="history1001",
             info="History content_task Task Got {} this time".format(len(task_list)),
-            function="History Contents Task"
+            function="History Contents Task {}".format(self.publish_flag)
         )
         if task_list:
             a = time.time()