Browse Source

多线程启动historyTask

luojunhui 5 months ago
parent
commit
21b444dd00
2 changed files with 0 additions and 144 deletions
  1. 0 72
      new_history.py
  2. 0 72
      tasks/new_history_task.py

+ 0 - 72
new_history.py

@@ -1,72 +0,0 @@
-"""
-@author: luojunhui
-"""
-import asyncio
-import time
-import traceback
-
-from tasks.new_history_task import NewHistoryTask
-from applications.db import AsyncMySQLClient
-from applications.log import logging
-from applications.feishu import bot
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    async_mysql_pool = AsyncMySQLClient()
-    try:
-        await async_mysql_pool.init_pool()
-        logging(
-            code="newHistory0001",
-            info="Init MySQL pool successfully",
-            alg="historyContentIdTask",
-            function="main"
-        )
-    except Exception as e:
-        logging(
-            code="newHistory0002",
-            info="Init MySQL pool failed",
-            alg="historyContentIdTask",
-            function="main",
-            data={
-                "error": str(e),
-                "traceback": traceback.format_exc()
-            }
-        )
-        bot(
-            title="newHistoryContentIdTask INIT MYSQL FAILS",
-            detail={
-                "error": str(e),
-                "traceback": traceback.format_exc()
-            }
-        )
-    try:
-        history_content_id_task = NewHistoryTask(async_mysql_pool)
-    except Exception as e:
-        logging(
-            code="newHistory0003",
-            info="Init newHistory0003 failed",
-            alg="historyContentIdTask",
-            function="main",
-            data={
-                "error": str(e),
-                "traceback": traceback.format_exc()
-            }
-        )
-        bot(
-            title="newHistoryContentIdTask INIT TASK FAILS",
-            detail={
-                "error": str(e),
-                "traceback": traceback.format_exc()
-            }
-        )
-        return
-    await history_content_id_task.deal_new()
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        time.sleep(5)

+ 0 - 72
tasks/new_history_task.py

@@ -1,72 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import time
-import asyncio
-
-from applications.log import logging
-from .history_task import historyContentIdTask
-
-
-class NewHistoryTask(historyContentIdTask):
-    """
-    新的历史任务
-    """
-
-    async def get_tasks_new(self):
-        """
-        获取任务
-        :return:
-        """
-        select_sql1 = f"""
-            SELECT 
-                ART.trace_id, 
-                ART.content_id, 
-                ART.flow_pool_level, 
-                ART.gh_id,
-                ART.process_times,
-                ART.publish_flag
-            FROM {self.article_match_video_table} ART
-            JOIN (
-                select content_id, count(1) as cnt 
-                from {self.article_crawler_video_table}
-                where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
-                group by content_id
-            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
-            WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
-            AND ART.publish_flag = {self.const.DO_NOT_NEED_PUBLISH}
-            ORDER BY ART.flow_pool_level, ART.request_timestamp
-            LIMIT {self.history_coroutines};
-        """
-        tasks = await self.mysql_client.async_select(sql=select_sql1)
-        task_obj_list = [
-            {
-                "trace_id": item[0],
-                "content_id": item[1],
-                "flow_pool_level": item[2],
-                "gh_id": item[3],
-                "process_times": item[4],
-                "publish_flag": item[5]
-            } for item in tasks
-        ]
-        return task_obj_list
-
-    async def deal_new(self):
-        """
-        处理publish_flag=2的任务
-        """
-        task_list = await self.get_tasks_new()
-        logging(
-            code="newHistory1001",
-            info="History content_task Task Got {} this time".format(len(task_list)),
-            function="History Contents Task"
-        )
-        if task_list:
-            a = time.time()
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)
-            b = time.time()
-            print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
-        else:
-            print("暂时未获得历史已存在文章")