Browse Source

2024-06-13
异步定时任务

罗俊辉 11 months ago
parent
commit
ac164fd467
3 changed files with 411 additions and 25 deletions
  1. 28 25
      applications/deal/process_deal.py
  2. 298 0
      applications/deal/process_deal_2.py
  3. 85 0
      task2.py

+ 28 - 25
applications/deal/process_deal.py

@@ -34,31 +34,34 @@ class ProcessDeal(object):
         """
         content_ids = await self.mysql_client.async_select(select_sql1)
         content_ids_tuple = tuple([i[0] for i in content_ids])
-        select_sql = f"""
-            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
-            FROM {db_article} 
-            WHERE content_id in {content_ids_tuple}
-            ORDER BY request_time_stamp
-            ASC
-        """
-        task_list = await self.mysql_client.async_select(sql=select_sql)
-        task_obj_list = [
-            {
-                "trace_id": item[0],
-                "content_id": item[1],
-                "gh_id": item[2],
-                "title": item[3],
-                "text": item[4],
-                "content_status": item[5],
-                "process_times": item[6]
-            } for item in task_list
-        ]
-        logging(
-            code="9001",
-            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
-            data=task_obj_list
-        )
-        return task_obj_list
+        if len(content_ids_tuple) > 0:
+            select_sql = f"""
+                SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+                FROM {db_article} 
+                WHERE content_id in {content_ids_tuple}
+                ORDER BY request_time_stamp
+                ASC
+            """
+            task_list = await self.mysql_client.async_select(sql=select_sql)
+            task_obj_list = [
+                {
+                    "trace_id": item[0],
+                    "content_id": item[1],
+                    "gh_id": item[2],
+                    "title": item[3],
+                    "text": item[4],
+                    "content_status": item[5],
+                    "process_times": item[6]
+                } for item in task_list
+            ]
+            logging(
+                code="9001",
+                info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+                data=task_obj_list
+            )
+            return task_obj_list
+        else:
+            return []
 
     async def get_history_contents(self, content_id):
         """

+ 298 - 0
applications/deal/process_deal_2.py

@@ -0,0 +1,298 @@
+"""
+@author: luojunhui
+"""
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from applications.static.config import db_article
+from applications.schedule import search_videos
+from applications.functions.log import logging
+
+
+class ProcessDeal(object):
+    """
+    定时执行任务
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def get_task(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql = f"""
+            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5
+            ORDER BY request_time_stamp
+            ASC
+            LIMIT 30
+        """
+        task_list = await self.mysql_client.async_select(sql=select_sql)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in task_list
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def get_history_contents(self, content_id):
+        """
+        check whether the content id exists
+        :return: trace_id or None
+        """
+        select_sql = f"""
+               SELECT trace_id, content_status
+               FROM {db_article}
+               WHERE content_id = '{content_id}'
+               ORDER BY id DESC;
+           """
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                trace_id, content_status = item
+                if content_status == 2:
+                    return trace_id
+                else:
+                    continue
+            return None
+        else:
+            return None
+
+    async def judge_content_processing(self, content_id):
+        """
+        判断该content_id是否在处理中
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+                       SELECT trace_id, content_status
+                       FROM {db_article}
+                       WHERE content_id = '{content_id}'
+                       ORDER BY id DESC;
+                   """
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                trace_id, content_status = item
+                if content_status == 1:
+                    return False
+            return True
+        else:
+            return True
+
+    async def insert_history_contents_videos(self, history_trace_id, params):
+        """
+        插入历史视频id
+        :return:
+        """
+        select_sql = f"""
+            SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
+            FROM {db_article}
+            WHERE trace_id = '{history_trace_id}';
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title, vid1, vid2, vid3 = info[0]
+        update_sql = f"""
+        UPDATE {db_article}
+        SET 
+            kimi_title='{kimi_title}',
+            recall_video_id1={vid1}, 
+            recall_video_id2={"NULL" if vid2 is None else vid2}, 
+            recall_video_id3={"NULL" if vid3 is None else vid3},
+            content_status=2,
+            process_times = {int(params['process_times']) + 1}
+        WHERE  trace_id = '{params['trace_id']}'
+        """
+        await self.mysql_client.async_insert(update_sql)
+        logging(
+            code="9002",
+            info="已从历史文章更新,历史id: {}".format(history_trace_id),
+            trace_id=params['trace_id']
+        )
+
+    async def process_video_id(self, title, trace_id, process_times):
+        """
+        如果video_id在标题中,则做特殊处理
+        :return:
+        """
+        video_id = title.split("video_id=")[-1]
+        update_sql = f"""
+            UPDATE  
+                {db_article}
+            SET 
+                recall_video_id1 = '{video_id}',
+                content_status = 2,
+                process_times = {int(process_times) + 1}
+            WHERE  
+                trace_id = '{trace_id}';"""
+        await self.mysql_client.async_insert(update_sql)
+
+    async def start_process(self, params):
+        """
+        开始处理
+        :param params:
+        :return:
+        """
+        # 更新文章contentId为1, 说明该文章正在处理中
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                content_status = 1
+            WHERE 
+                trace_id = '{params["trace_id"]}'
+        """
+        await self.mysql_client.async_insert(sql=update_sql)
+        try:
+            # 判断标题中是否包含video_id
+            if "video_id=" in params['title']:
+                logging(
+                    code="9006",
+                    info="视频生成文本测试",
+                    trace_id=params['trace_id']
+                )
+                await self.process_video_id(
+                    title=params['title'],
+                    trace_id=params['trace_id'],
+                    process_times=params['process_times']
+                )
+            else:
+                await search_videos(
+                    params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
+                    trace_id=params['trace_id'],
+                    gh_id=params['gh_id'],
+                    mysql_client=self.mysql_client
+                )
+                # 执行完成之后,判断是否存在视频id
+                select_sql = f"""
+                    SELECT recall_video_id1, recall_video_id2, recall_video_id3
+                    FROM {db_article}
+                    WHERE trace_id = '{params["trace_id"]}';
+                """
+                result = await self.mysql_client.async_select(sql=select_sql)
+                vid1, vid2, vid3 = result[0]
+                if vid1:
+                    update_sql2 = f"""
+                        UPDATE {db_article}
+                        SET 
+                           content_status = 2,
+                           process_times = {int(params['process_times']) + 1}
+                           WHERE trace_id = '{params["trace_id"]}';
+                    """
+                    await self.mysql_client.async_insert(sql=update_sql2)
+                    logging(
+                        code="9008",
+                        info="视频搜索成功, 状态修改为2",
+                        trace_id=params['trace_id']
+                    )
+                else:
+                    update_sql3 = f"""
+                                        UPDATE {db_article}
+                                        SET 
+                                           content_status = 0,
+                                           process_times = {int(params['process_times']) + 1}
+                                        WHERE trace_id = '{params["trace_id"]}';
+                                    """
+                    await self.mysql_client.async_insert(sql=update_sql3)
+                    logging(
+                        code="9018",
+                        info="视频搜索失败,回退状态为0",
+                        trace_id=params['trace_id']
+                    )
+        except Exception as e:
+            logging(
+                code="9018",
+                info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
+                trace_id=params['trace_id']
+            )
+            update_sql4 = f"""
+                UPDATE {db_article}
+                SET 
+                   content_status = 0,
+                   process_times = {int(params['process_times']) + 1}
+                WHERE trace_id = '{params["trace_id"]}';
+            """
+            await self.mysql_client.async_insert(sql=update_sql4)
+
+    async def process_task(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该文章是否已经生成了
+        history_trace_id = await self.get_history_contents(content_id)
+        if history_trace_id:
+            # 说明已经存在了结果, 将该条记录下的video_id拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            await self.insert_history_contents_videos(history_trace_id, params)
+        else:
+            logging(
+                code="9003",
+                info="未找到历史文章",
+                trace_id=trace_id,
+                function="find_history_article"
+            )
+            # flag = await self.judge_content_processing(content_id)
+            # if flag:
+            #     logging(
+            #         code="9004",
+            #         info="无正在处理的文章ID, 开始处理",
+            #         trace_id=trace_id
+            #     )
+            #     await self.start_process(params=params)
+            # else:
+            #     logging(
+            #         code="9003",
+            #         info="该文章ID正在请求--文章ID {}".format(content_id),
+            #         trace_id=trace_id
+            #     )
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.get_task()
+        # print(task_list)
+        # task_dict = {}
+        # for task in task_list:
+        #     key = task['content_id']
+        #     task_dict[key] = task
+        # process_list = []
+        # for item in task_dict:
+        #     process_list.append(task_dict[item])
+        if task_list:
+            # for task in task_list:
+            #     await self.process_task(task)
+            tasks = [self.process_task(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 85 - 0
task2.py

@@ -0,0 +1,85 @@
+"""
+@author: luojunhui
+"""
+import time
+import datetime
+import asyncio
+
+import aiomysql
+
+from applications.deal.process_deal_2 import ProcessDeal
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def async_select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql):
+        """
+        insert and update method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql)
+                await coon.commit()
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = ProcessDeal(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待120s".format(now_str))
+        time.sleep(120)