Browse Source

分表,修改task1, task2, search_schedule.py

罗俊辉 8 months ago
parent
commit
bcf26d6661
3 changed files with 578 additions and 1 deletions
  1. 1 1
      match_server.toml
  2. 153 0
      tasks/chadui.py
  3. 424 0
      tt.py

+ 1 - 1
match_server.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:8111"
-workers = 4
+workers = 10
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 loglevel = "debug"  # 日志级别

+ 153 - 0
tasks/chadui.py

@@ -0,0 +1,153 @@
+"""
+@author: luojunhui
+"""
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from static.config import db_article, db_video
+from applications.functions.log import logging
+from static.config import mysql_coroutines
+
+
+class MatchTask5(object):
+    """
+    定时执行任务
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def get_task(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql = f"""
+            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
+            ORDER BY request_time_stamp
+            DESC
+            LIMIT {mysql_coroutines};
+        """
+        task_list = await self.mysql_client.async_select(sql=select_sql)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in task_list
+        ]
+        print("本次任务获取到 {} 条视频".format(len(task_obj_list)))
+        # logging(
+        #     code="9001",
+        #     info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+        #     data=task_obj_list
+        # )
+        return task_obj_list
+
+    async def get_history_videos(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        print(len(videos))
+        if len(videos) >= 3:
+            return videos
+        else:
+            return None
+
+    async def use_exists_contents_videos(self, video_id_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                video_id_list[0],
+                "NULL" if vid2 is None else vid2,
+                "NULL" if vid3 is None else vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def process_task(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        print(content_id)
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        print(video_id_list)
+        if video_id_list:
+            # 说明已经存在了结果, 将该条记录下的video_id拿出来
+            print("存在历史文章")
+            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.get_task()
+        print(len(task_list))
+        if task_list:
+            tasks = [self.process_task(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 424 - 0
tt.py

@@ -0,0 +1,424 @@
+"""
+@author: luojunhui
+"""
+import time
+
+import aiomysql
+import asyncio
+
+from static.config import db_article, db_video
+from applications.schedule import search_videos
+from applications.functions.log import logging
+from static.config import spider_coroutines
+
+
+class MatchTask1(object):
+    """
+    定时执行任务
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def get_task(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT DISTINCT (content_id)       
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
+            ORDER BY request_time_stamp
+            ASC
+            LIMIT {spider_coroutines};
+        """
+        content_ids = await self.mysql_client.async_select(select_sql1)
+        cil = []
+        for content_id in content_ids:
+            cil.append(content_id[0])
+        content_ids_tuple = str(cil).replace("[", "(").replace("]", ")")
+        if len(content_ids_tuple) > 3:
+            select_sql = f"""
+                SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+                FROM {db_article} 
+                WHERE content_id in {content_ids_tuple} and process_times <= 5 
+                ORDER BY request_time_stamp
+                ASC;
+            """
+            print(select_sql)
+            task_list = await self.mysql_client.async_select(sql=select_sql)
+            task_obj_list = [
+                {
+                    "trace_id": item[0],
+                    "content_id": item[1],
+                    "gh_id": item[2],
+                    "title": item[3],
+                    "text": item[4],
+                    "content_status": item[5],
+                    "process_times": item[6]
+                } for item in task_list
+            ]
+            logging(
+                code="9001",
+                info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+                data=task_obj_list
+            )
+            return task_obj_list
+        else:
+            return []
+
+    async def get_history_videos(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        if len(videos) >= 3:
+            return videos
+        else:
+            return None
+
+    async def judge_content_processing(self, content_id):
+        """
+        判断该content_id是否在处理中
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+                       SELECT trace_id, content_status
+                       FROM {db_article}
+                       WHERE content_id = '{content_id}'
+                       ORDER BY id DESC;
+                   """
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                trace_id, content_status = item
+                if content_status == 1:
+                    return False
+            return True
+        else:
+            return True
+
+    async def use_exists_contents_videos(self, video_id_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                video_id_list[0],
+                "NULL" if vid2 is None else vid2,
+                "NULL" if vid3 is None else vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def start_process(self, params):
+        """
+        开始处理
+        :param params:
+        :return:
+        """
+        # 更新文章contentId为1, 说明该文章正在处理中
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                content_status = %s
+            WHERE 
+                trace_id = %s;
+        """
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                1, params['trace_id']
+            )
+        )
+        try:
+            video_count = await search_videos(
+                params={
+                    "title": params['title'],
+                    "content": params['text'],
+                    "trace_id": params['trace_id'],
+                    "content_id": params['content_id']
+                },
+                trace_id=params['trace_id'],
+                gh_id=params['gh_id'],
+                mysql_client=self.mysql_client
+            )
+            select_sql = f"""
+                SELECT video_id
+                FROM {db_video}
+                WHERE content_id = '{params['content_id']}'
+            """
+            result = await self.mysql_client.async_select(sql=select_sql)
+            vid1, vid2, vid3 = result[0], result[1], result[2]
+            if vid1 or vid2 or vid3:
+                update_sql2 = f"""
+                    UPDATE {db_article}
+                    SET
+                        recall_video_id1 = %s,
+                        recall_video_id2 = %s,
+                        recall_video_id3 = %s,
+                        content_status = %s,
+                        process_times = %s
+                        WHERE trace_id = %s;
+                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql2,
+                    params=(
+                        vid1 if vid1 else "NULL",
+                        vid2 if vid2 else "NULL",
+                        vid3 if vid3 else "NULL",
+                        2,
+                        {int(params['process_times']) + 1},
+                        params['trace_id']
+                    )
+                )
+                logging(
+                    code="9008",
+                    info="视频搜索成功, 状态修改为2",
+                    trace_id=params['trace_id']
+                )
+            else:
+                if int(params['process_times']) < 5:
+                    update_sql3 = f"""
+                        UPDATE {db_article}
+                        SET 
+                           content_status = %s,
+                           process_times = %s
+                        WHERE trace_id = %s;
+                                    """
+                    await self.mysql_client.async_insert(
+                        sql=update_sql3,
+                        params=(0, int(params['process_times']) + 1, params['trace_id'])
+                    )
+                    logging(
+                        code="9018",
+                        info="视频搜索失败,回退状态为0",
+                        trace_id=params['trace_id']
+                    )
+                else:
+                    update_sql3 = f"""
+                        UPDATE {db_article}
+                        SET 
+                           content_status = %s,
+                           process_times = %s
+                        WHERE trace_id = %s;
+                                    """
+                    await self.mysql_client.async_insert(
+                        sql=update_sql3,
+                        params=(3, int(params['process_times']) + 1, params['trace_id'])
+                    )
+                    logging(
+                        code="9019",
+                        info="视频多次搜索失败,状态修改为3",
+                        trace_id=params['trace_id']
+                    )
+        except Exception as e:
+            if int(params['process_times']) < 5:
+                logging(
+                    code="9018",
+                    info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
+                    trace_id=params['trace_id']
+                )
+                update_sql4 = f"""
+                    UPDATE {db_article}
+                    SET
+                       content_status = %s,
+                       process_times = %s
+                    WHERE trace_id = %s;
+                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql4,
+                    params=(0, int(params['process_times']) + 1, params['trace_id'])
+                )
+            else:
+                logging(
+                    code="9019",
+                    info="{}异常错误:{}, 状态修改为3".format(params['trace_id'], e),
+                    trace_id=params['trace_id']
+                )
+                update_sql4 = f"""
+                                    UPDATE {db_article}
+                                    SET
+                                       content_status = %s,
+                                       process_times = %s
+                                    WHERE trace_id = %s;
+                                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql4,
+                    params=(3, int(params['process_times']) + 1, params['trace_id'])
+                )
+
+    async def process_task(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        if video_id_list:
+            # 说明已经存在了结果, 将该条记录下的video_id拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
+        else:
+            flag = await self.judge_content_processing(content_id)
+            if flag:
+                logging(
+                    code="9004",
+                    info="无正在处理的文章ID, 开始处理",
+                    trace_id=trace_id
+                )
+                await self.start_process(params=params)
+            else:
+                logging(
+                    code="9003",
+                    info="该文章ID正在请求--文章ID {}".format(content_id),
+                    trace_id=trace_id
+                )
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.get_task()
+        task_dict = {}
+        for task in task_list:
+            key = task['content_id']
+            task_dict[key] = task
+        process_list = []
+        for item in task_dict:
+            process_list.append(task_dict[item])
+        if process_list:
+            # for task in task_list:
+            #     await self.process_task(task)
+            tasks = [self.process_task(params) for params in process_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def async_select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql, params):
+        """
+        insert and update method
+        :param params:
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql, params)
+                await coon.commit()
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = MatchTask1(TMC)
+    await PD.deal()
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        time.sleep(10)