Browse Source

新模式,小程序匹配加速

luojunhui 5 months ago
parent
commit
a32fb47000
2 changed files with 144 additions and 0 deletions
  1. 72 0
      new_history.py
  2. 72 0
      tasks/new_history_task.py

+ 72 - 0
new_history.py

@@ -0,0 +1,72 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+import time
+import traceback
+
+from tasks.new_history_task import NewHistoryTask
+from applications.db import AsyncMySQLClient
+from applications.log import logging
+from applications.feishu import bot
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    async_mysql_pool = AsyncMySQLClient()
+    try:
+        await async_mysql_pool.init_pool()
+        logging(
+            code="newHistory0001",
+            info="Init MySQL pool successfully",
+            alg="historyContentIdTask",
+            function="main"
+        )
+    except Exception as e:
+        logging(
+            code="newHistory0002",
+            info="Init MySQL pool failed",
+            alg="historyContentIdTask",
+            function="main",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+        bot(
+            title="newHistoryContentIdTask INIT MYSQL FAILS",
+            detail={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+    try:
+        history_content_id_task = NewHistoryTask(async_mysql_pool)
+    except Exception as e:
+        logging(
+            code="newHistory0003",
+            info="Init newHistory0003 failed",
+            alg="historyContentIdTask",
+            function="main",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+        bot(
+            title="newHistoryContentIdTask INIT TASK FAILS",
+            detail={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+        return
+    await history_content_id_task.deal_new()
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        time.sleep(5)

+ 72 - 0
tasks/new_history_task.py

@@ -0,0 +1,72 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import asyncio
+
+from applications.log import logging
+from .history_task import historyContentIdTask
+
+
+class NewHistoryTask(historyContentIdTask):
+    """
+    新的历史任务
+    """
+
+    async def get_tasks_new(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT 
+                ART.trace_id, 
+                ART.content_id, 
+                ART.flow_pool_level, 
+                ART.gh_id,
+                ART.process_times,
+                ART.publish_flag
+            FROM {self.article_match_video_table} ART
+            JOIN (
+                select content_id, count(1) as cnt 
+                from {self.article_crawler_video_table}
+                where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                group by content_id
+            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
+            WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
+            AND ART.publish_flag = {self.const.DO_NOT_NEED_PUBLISH}
+            ORDER BY ART.flow_pool_level, ART.request_timestamp
+            LIMIT {self.history_coroutines};
+        """
+        tasks = await self.mysql_client.async_select(sql=select_sql1)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "flow_pool_level": item[2],
+                "gh_id": item[3],
+                "process_times": item[4],
+                "publish_flag": item[5]
+            } for item in tasks
+        ]
+        return task_obj_list
+
+    async def deal_new(self):
+        """
+        处理publish_flag=2的任务
+        """
+        task_list = await self.get_tasks_new()
+        logging(
+            code="newHistory1001",
+            info="History content_task Task Got {} this time".format(len(task_list)),
+            function="History Contents Task"
+        )
+        if task_list:
+            a = time.time()
+            tasks = [self.process_task(params) for params in task_list]
+            await asyncio.gather(*tasks)
+            b = time.time()
+            print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
+        else:
+            print("暂时未获得历史已存在文章")