Parcourir la source

db/__INIT__.py
async_insert方法增加返回 affect_rows

historyTask,
newContentIdTask.py
增加了分布式锁,通过 affect_rows 来控制
常量大写

罗俊辉 il y a 6 mois
Parent
commit
ef7773840c
4 fichiers modifiés avec 220 ajouts et 118 suppressions
  1. 2 0
      applications/db/__init__.py
  2. 5 5
      historyTask.py
  3. 48 19
      tasks/history_task.py
  4. 165 94
      tasks/newContentIdTask.py

+ 2 - 0
applications/db/__init__.py

@@ -62,7 +62,9 @@ class AsyncMySQLClient(object):
             async with coon.cursor() as cursor:
                 try:
                     await cursor.execute(sql, params)
+                    affected_rows = cursor.rowcount
                     await coon.commit()
+                    return affected_rows
                 except Exception as e:
                     await coon.rollback()
                     raise

+ 5 - 5
historyTask.py

@@ -5,7 +5,7 @@ import time
 import asyncio
 import datetime
 from tasks.history_task import historyContentIdTask
-from applications.db import TaskMySQLClient
+from applications.db import AsyncMySQLClient
 
 
 async def main():
@@ -13,10 +13,10 @@ async def main():
     main job
     :return:
     """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = historyContentIdTask(TMC)
-    await PD.deal()
+    async_mysql_pool = AsyncMySQLClient()
+    await async_mysql_pool.init_pool()
+    history_content_id_task = historyContentIdTask(async_mysql_pool)
+    await history_content_id_task.deal()
 
 
 if __name__ == '__main__':

+ 48 - 19
tasks/history_task.py

@@ -15,6 +15,9 @@ class historyContentIdTask(object):
     """
     处理已经匹配过小程序的文章
     """
+    TASK_PROCESSING_STATUS = 101
+    TASK_INIT_STATUS = 0
+    TASK_PUBLISHED_STATUS = 4
 
     def __init__(self, mysql_client):
         """
@@ -76,7 +79,8 @@ class historyContentIdTask(object):
         sql = f"""
         SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
         FROM {self.article_crawler_video_table}
-        WHERE content_id = '{content_id}' and download_status = 2;
+        WHERE content_id = '{content_id}' and download_status = 2
+        ORDER BY score DESC;
         """
         res_tuple = await self.mysql_client.async_select(sql)
         if len(res_tuple) >= 3:
@@ -89,7 +93,8 @@ class historyContentIdTask(object):
                     "cover_oss_path": i[4],
                     "uid": i[5]
                 }
-                for i in res_tuple]
+                for i in res_tuple
+            ]
         else:
             return []
 
@@ -108,6 +113,29 @@ class historyContentIdTask(object):
         else:
             return False
 
+    async def update_content_status(self, new_content_status, trace_id, ori_content_status):
+        """
+        :param new_content_status:
+        :param trace_id:
+        :param ori_content_status:
+        :return:
+        """
+        update_sql = f"""
+                    UPDATE {self.article_match_video_table}
+                    SET content_status = %s, content_status_update_time = %s
+                    WHERE trace_id = %s and content_status = %s;
+                    """
+        row_counts = await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                new_content_status,
+                int(time.time()),
+                trace_id,
+                ori_content_status
+            )
+        )
+        return row_counts
+
     async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
         """
         发布至 pq
@@ -119,7 +147,6 @@ class historyContentIdTask(object):
         :param flow_pool_level: 流量池层级 ---> str
         :return:
         """
-        published_status = 4
         match flow_pool_level:
             case "autoArticlePoolLevel4":
                 # 冷启层, 全量做
@@ -160,11 +187,17 @@ class historyContentIdTask(object):
         update_sql = f"""
            UPDATE {self.article_match_video_table}
            SET content_status = %s, response = %s, process_times = %s
-           WHERE trace_id = %s;
+           WHERE trace_id = %s and content_status = %s;
            """
         await self.mysql_client.async_insert(
             sql=update_sql,
-            params=(published_status, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
+            params=(
+                self.TASK_PUBLISHED_STATUS,
+                json.dumps(L, ensure_ascii=False),
+                process_times + 1,
+                trace_id,
+                self.TASK_PROCESSING_STATUS
+            )
         )
         logging(
             code="9002",
@@ -184,21 +217,17 @@ class historyContentIdTask(object):
         gh_id = params['gh_id']
         process_times = params['process_times']
         download_videos = await self.get_video_list(content_id=content_id)
+        # time.sleep(3)
         if download_videos:
             # 把状态修改为 4
-            """
-            todo: 加上状态锁,防止多个进程同时处理一个视频, 通过 update_time && content_id来判断
-            """
-            # update_sql = f"""
-            # UPDATE {self.article_crawler_video_table}
-            # SET content_status = %s
-            # WHERE trace_id = %s;
-            # """
-            # await self.mysql_client.asyncInsert(
-            #     sql=update_sql,
-            #     params=(4, trace_id)
-            # )
-
+            affected_rows = await self.update_content_status(
+                trace_id=trace_id,
+                new_content_status=self.TASK_PROCESSING_STATUS,
+                ori_content_status=self.TASK_INIT_STATUS
+            )
+            if affected_rows == 0:
+                print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
+                return
             kimi_title = await self.get_kimi_title(content_id)
             await self.publish_videos_to_pq(
                 flow_pool_level=flow_pool_level,
@@ -209,7 +238,7 @@ class historyContentIdTask(object):
                 process_times=process_times
             )
         else:
-            pass
+            return
 
     async def deal(self):
         """

+ 165 - 94
tasks/newContentIdTask.py

@@ -19,6 +19,15 @@ class NewContentIdTask(object):
     """
     不存在历史已经发布的文章的匹配流程
     """
+    TASK_INIT_STATUS = 0
+    TASK_KIMI_FINISHED_STATUS = 1
+    TASK_SPIDER_FINISHED_STATUS = 2
+    TASK_ETL_FINISHED_STATUS = 3
+    TASK_PUBLISHED_STATUS = 4
+    TASK_PROCESSING_STATUS = 101
+    TASK_FAIL_STATUS = 99
+    ARTICLE_TEXT_TABLE_ERROR = 98
+    TASK_MAX_PROCESS_TIMES = 3
 
     def __init__(self, mysql_client):
         self.mysql_client = mysql_client
@@ -29,11 +38,6 @@ class NewContentIdTask(object):
         self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
         self.account_map = json.loads(self.config.get_config_value("accountMap"))
         self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
-        self.default_status = 0
-        self.task_processing_status = 101
-        self.task_defeat_status = 99
-        self.article_text_table_error = 4
-        self.max_process_times = 3
 
     async def get_tasks(self):
         """
@@ -42,9 +46,13 @@ class NewContentIdTask(object):
         """
         # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
         select_processing_sql = f"""
-            SELECT trace_id, content_status_update_time, process_times
-            FROM {self.article_match_video_table}
-            WHERE content_status = {self.task_processing_status} and process_times <= {self.max_process_times}; 
+            SELECT 
+                trace_id, content_status_update_time, process_times
+            FROM 
+                {self.article_match_video_table}
+            WHERE 
+                    content_status = {self.TASK_PROCESSING_STATUS} 
+                and process_times <= {self.TASK_MAX_PROCESS_TIMES}; 
         """
         processing_articles = await self.mysql_client.async_select(select_processing_sql)
         if processing_articles:
@@ -65,20 +73,30 @@ class NewContentIdTask(object):
                     )
         # 将  process_times > 3 的任务的状态修改为失败
         update_status_sql = f"""
-            UPDATE {self.article_match_video_table}
-            SET content_status = %s
-            WHERE process_times > %s;
+            UPDATE 
+                {self.article_match_video_table}
+            SET 
+                content_status = %s
+            WHERE 
+                process_times > %s;
         """
         await self.mysql_client.async_insert(
             update_status_sql,
-            params=(self.task_defeat_status, self.max_process_times)
+            params=(
+                self.TASK_FAIL_STATUS,
+                self.TASK_MAX_PROCESS_TIMES
+            )
         )
         # 获取  process_times <= 3 且  content_status = 0 的任务
         select_sql = f"""
-            SELECT trace_id, content_id, flow_pool_level, gh_id, process_times
-            FROM {self.article_match_video_table} 
-            WHERE content_status = {self.default_status} and process_times <= {self.max_process_times}
-            limit {self.spider_coroutines};
+            SELECT 
+                trace_id, content_id, flow_pool_level, gh_id, process_times
+            FROM 
+                {self.article_match_video_table} 
+            WHERE 
+                    content_status = {self.TASK_INIT_STATUS} 
+                and process_times <= {self.TASK_MAX_PROCESS_TIMES}
+            LIMIT {self.spider_coroutines};
         """
         tasks = await self.mysql_client.async_select(select_sql)
         if tasks:
@@ -124,7 +142,7 @@ class NewContentIdTask(object):
                     SET content_status = %s, content_status_update_time = %s
                     WHERE trace_id = %s and content_status = %s;
                     """
-        await self.mysql_client.async_insert(
+        row_counts = await self.mysql_client.async_insert(
             sql=update_sql,
             params=(
                 new_content_status,
@@ -133,6 +151,7 @@ class NewContentIdTask(object):
                 ori_content_status
             )
         )
+        return row_counts
 
     async def roll_back_content_status_when_fails(self, process_times, trace_id):
         """
@@ -152,11 +171,11 @@ class NewContentIdTask(object):
         await self.mysql_client.async_insert(
             sql=update_article_sql,
             params=(
-                self.default_status,
+                self.TASK_INIT_STATUS,
                 int(time.time()),
                 process_times + 1,
                 trace_id,
-                self.task_processing_status
+                self.TASK_PROCESSING_STATUS
             )
         )
 
@@ -175,7 +194,7 @@ class NewContentIdTask(object):
         if result:
             for item in result:
                 content_status = item[0]
-                if content_status != self.default_status:
+                if content_status != self.TASK_INIT_STATUS:
                     return True
             return False
         else:
@@ -189,7 +208,8 @@ class NewContentIdTask(object):
         sql = f"""
                 SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
                 FROM {self.article_crawler_video_table}
-                WHERE content_id = '{content_id}' and download_status = 2;
+                WHERE content_id = '{content_id}' and download_status = 2
+                ORDER BY score DESC;
                 """
         res_tuple = await self.mysql_client.async_select(sql)
         return [
@@ -201,7 +221,8 @@ class NewContentIdTask(object):
                 "cover_oss_path": i[4],
                 "uid": i[5]
             }
-            for i in res_tuple]
+            for i in res_tuple
+        ]
 
     async def get_kimi_status(self, content_id):
         """
@@ -218,33 +239,32 @@ class NewContentIdTask(object):
             kimi_status = response[0][0]
             return kimi_status
         else:
-            return self.article_text_table_error
+            return self.ARTICLE_TEXT_TABLE_ERROR
 
     async def kimi_task(self, params):
         """
         执行 kimi 任务
         :return:
         """
-        kimi_success_status = 1
-        kimi_fail_status = 2
+        KIMI_SUCCESS_STATUS = 1
+        KIMI_FAIL_STATUS = 2
         content_id = params['content_id']
         trace_id = params['trace_id']
         process_times = params['process_times']
         kimi_status_code = await self.get_kimi_status(content_id=content_id)
-        if kimi_status_code == kimi_success_status:
-            await self.update_content_status(
-                new_content_status=kimi_success_status,
+
+        if kimi_status_code == KIMI_SUCCESS_STATUS:
+            affected_rows = await self.update_content_status(
+                new_content_status=self.TASK_KIMI_FINISHED_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.default_status
+                ori_content_status=self.TASK_INIT_STATUS
             )
-            """
-            {
-                    "kimi_title": kimi_title,
-                    "ori_title": article_obj['article_title'],
-                    "kimi_summary": content_title,
-                    "kimi_keys": kimi_info['content_keys']
-                }
-            """
+            if affected_rows == 0:
+                logging(
+                    code="6000",
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                )
+                return
             get_kimi_sql = f"""
             SELECT article_title, kimi_title, kimi_summary, kimi_keys
             FROM {self.article_text_table}
@@ -257,18 +277,24 @@ class NewContentIdTask(object):
                 "kimi_summary": kimi_info[0][2],
                 "kimi_keys": json.loads(kimi_info[0][3])
             }
-        elif kimi_status_code == self.article_text_table_error:
-            """
-            todo: 文章表和匹配表没有同步更新,暂时不处理此次任务
-            """
-            print("article_text表还没有更新")
+        elif kimi_status_code == self.ARTICLE_TEXT_TABLE_ERROR:
+            logging(
+                code="4000",
+                info="long_articles_text表中未找到 content_id"
+            )
         else:
             # 开始处理,讲 content_status 从 0  改为  101
-            await self.update_content_status(
-                new_content_status=self.task_processing_status,
+            affected_rows = await self.update_content_status(
+                new_content_status=self.TASK_PROCESSING_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.default_status
+                ori_content_status=self.TASK_INIT_STATUS
             )
+            if affected_rows == 0:
+                logging(
+                    code="6000",
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                )
+                return
             K = KimiServer()
             try:
                 select_sql = f"""
@@ -296,12 +322,12 @@ class NewContentIdTask(object):
                         WHERE content_id = %s;"""
                 await self.mysql_client.async_insert(
                     sql=update_kimi_sql,
-                    params=(kimi_title, content_title, content_keys, kimi_success_status, params['content_id'])
+                    params=(kimi_title, content_title, content_keys, KIMI_SUCCESS_STATUS, params['content_id'])
                 )
                 await self.update_content_status(
-                    new_content_status=kimi_success_status,
+                    new_content_status=self.TASK_KIMI_FINISHED_STATUS,
                     trace_id=trace_id,
-                    ori_content_status=self.task_processing_status
+                    ori_content_status=self.TASK_PROCESSING_STATUS
                 )
                 return {
                     "kimi_title": kimi_title,
@@ -319,7 +345,10 @@ class NewContentIdTask(object):
                         """
                 await self.mysql_client.async_insert(
                     sql=update_kimi_sql,
-                    params=(kimi_fail_status, content_id)
+                    params=(
+                        KIMI_FAIL_STATUS,
+                        content_id
+                    )
                 )
                 # 将状态由 101  回退为  0
                 await self.roll_back_content_status_when_fails(
@@ -332,10 +361,8 @@ class NewContentIdTask(object):
         """
         爬虫任务
         :return:
-        todo: 任务执行之前加一个判断,判断是存在 3 条以上的视频已经被抓取
         """
-        spider_default_status = 1
-        spider_success_status = 2
+        SPIDER_INIT_STATUS = 1
         trace_id = params['trace_id']
         content_id = params['content_id']
         process_times = params['process_times']
@@ -347,18 +374,24 @@ class NewContentIdTask(object):
         counts = count_tuple[0][0]
         if counts >= 3:
             await self.update_content_status(
-                new_content_status=spider_success_status,
+                new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
-                ori_content_status=spider_default_status
+                ori_content_status=SPIDER_INIT_STATUS
             )
             return True
-        try:
-            # 开始处理,将状态由 1 改成  101
-            await self.update_content_status(
-                new_content_status=self.task_processing_status,
-                ori_content_status=spider_default_status,
-                trace_id=trace_id
+        # 开始处理,将状态由 1 改成  101
+        affected_rows = await self.update_content_status(
+            new_content_status=self.TASK_PROCESSING_STATUS,
+            ori_content_status=SPIDER_INIT_STATUS,
+            trace_id=trace_id
+        )
+        if affected_rows == 0:
+            logging(
+                code="6000",
+                info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
             )
+            return False
+        try:
             search_videos_count = await search_videos_from_web(
                 info={
                     "ori_title": kimi_result['ori_title'],
@@ -373,11 +406,11 @@ class NewContentIdTask(object):
                 db_client=self.mysql_client
             )
             if search_videos_count >= 3:
-                # 表示爬虫任务执行成功, 将状态从 101  改 2
+                # 表示爬虫任务执行成功, 将状态从 101  改 2
                 await self.update_content_status(
-                    new_content_status=spider_success_status,
+                    new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
                     trace_id=trace_id,
-                    ori_content_status=self.task_processing_status
+                    ori_content_status=self.TASK_PROCESSING_STATUS
                 )
                 return True
             else:
@@ -400,38 +433,49 @@ class NewContentIdTask(object):
         :param params:
         :return:
         """
-        video_download_success_status = 2
-        video_download_fail_status = 3
-        etl_task_default_status = 2
-        etl_task_success_status = 3
+        VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
+        VIDEO_DOWNLOAD_FAIL_STATUS = 3
+        ETL_TASK_INIT_STATUS = 2
         trace_id = params['trace_id']
         content_id = params['content_id']
         # 判断是否有三条已经下载完成的视频
         select_sql = f"""
-        select count(id) 
-        from {self.article_crawler_video_table} 
-        where content_id = '{content_id}' and download_status = {video_download_success_status};
+            select count(id) 
+            from {self.article_crawler_video_table} 
+            where content_id = '{content_id}' and download_status = {VIDEO_DOWNLOAD_SUCCESS_STATUS};
         """
         video_count_tuple = await self.mysql_client.async_select(select_sql)
         video_count = video_count_tuple[0][0]
-        if video_count > 3:
-            await self.update_content_status(
-                ori_content_status=etl_task_default_status,
+        if video_count >= 3:
+            affect_rows = await self.update_content_status(
+                ori_content_status=ETL_TASK_INIT_STATUS,
                 trace_id=trace_id,
-                new_content_status=etl_task_success_status
+                new_content_status=self.TASK_ETL_FINISHED_STATUS
             )
+            if affect_rows == 0:
+                logging(
+                    code="6000",
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                )
+                return False
             return True
         else:
             # 开始处理, 将文章状态修改为处理状态
-            await self.update_content_status(
-                ori_content_status=etl_task_default_status,
+            affected_rows = await self.update_content_status(
+                ori_content_status=ETL_TASK_INIT_STATUS,
                 trace_id=trace_id,
-                new_content_status=self.task_processing_status
+                new_content_status=self.TASK_PROCESSING_STATUS
             )
+            if affected_rows == 0:
+                logging(
+                    code="6000",
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                )
+                return False
             select_sql = f"""
                 SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
                 FROM {self.article_crawler_video_table}
-                WHERE content_id = '{content_id}' and download_status != {video_download_success_status}
+                WHERE content_id = '{content_id}' and download_status != {VIDEO_DOWNLOAD_SUCCESS_STATUS}
                 ORDER BY score DESC;
             """
             videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
@@ -482,16 +526,16 @@ class NewContentIdTask(object):
                         params=(
                             oss_video,
                             oss_cover,
-                            video_download_success_status,
+                            VIDEO_DOWNLOAD_SUCCESS_STATUS,
                             params['id']
                         )
                     )
                     downloaded_count += 1
                     if downloaded_count > 3:
                         await self.update_content_status(
-                            ori_content_status=self.task_processing_status,
+                            ori_content_status=self.TASK_PROCESSING_STATUS,
                             trace_id=trace_id,
-                            new_content_status=etl_task_success_status
+                            new_content_status=self.TASK_ETL_FINISHED_STATUS
                         )
                         return True
                 except Exception as e:
@@ -502,13 +546,13 @@ class NewContentIdTask(object):
                     """
                     await self.mysql_client.async_insert(
                         sql=update_sql,
-                        params=(video_download_fail_status, params['id'])
+                        params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
                     )
             if downloaded_count >= 3:
                 await self.update_content_status(
-                    ori_content_status=self.task_processing_status,
+                    ori_content_status=self.TASK_PROCESSING_STATUS,
                     trace_id=trace_id,
-                    new_content_status=etl_task_success_status
+                    new_content_status=self.TASK_ETL_FINISHED_STATUS
                 )
                 return True
             else:
@@ -525,19 +569,24 @@ class NewContentIdTask(object):
         :param params:
         :return:
         """
-        publish_default_status = 3
-        publish_success_status = 4
+        PUBLISH_DEFAULT_STATUS = 3
         gh_id = params['gh_id']
         flow_pool_level = params['flow_pool_level']
         content_id = params['content_id']
         trace_id = params['trace_id']
         process_times = params['process_times']
         # 开始处理,将状态修改为操作状态
-        await self.update_content_status(
-            ori_content_status=publish_default_status,
+        affected_rows = await self.update_content_status(
+            ori_content_status=PUBLISH_DEFAULT_STATUS,
             trace_id=trace_id,
-            new_content_status=self.task_processing_status
+            new_content_status=self.TASK_PROCESSING_STATUS
         )
+        if affected_rows == 0:
+            logging(
+                code="6000",
+                info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+            )
+            return False
         try:
             download_videos = await self.get_downloaded_videos(content_id)
             match flow_pool_level:
@@ -585,11 +634,11 @@ class NewContentIdTask(object):
             await self.mysql_client.async_insert(
                 sql=update_sql,
                 params=(
-                    publish_success_status,
+                    self.TASK_PUBLISHED_STATUS,
                     json.dumps(L, ensure_ascii=False),
                     process_times + 1,
                     trace_id,
-                    self.task_processing_status
+                    self.TASK_PROCESSING_STATUS
                 )
             )
         except Exception as e:
@@ -606,35 +655,57 @@ class NewContentIdTask(object):
         :return:
         """
         # step1: 执行 kimi 操作
+        # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
         kimi_result = await self.kimi_task(params)
+        trace_id = params['trace_id']
         if kimi_result:
             # 等待 kimi 操作执行完成之后,开始执行 spider_task
             print("kimi success")
+            logging(
+                code=3001,
+                info="kimi success",
+                trace_id=trace_id
+            )
             spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
             if spider_flag:
                 # 等待爬虫执行完成后,开始执行 etl_task
                 print("spider success")
+                logging(
+                    code=3002,
+                    info="spider_success",
+                    trace_id=trace_id
+                )
                 etl_flag = await self.etl_task(params)
                 if etl_flag:
                     # 等待下载上传完成,执行发布任务
                     print("etl success")
+                    logging(
+                        code="3003",
+                        info="etl_success",
+                        trace_id=trace_id
+                    )
                     try:
                         await self.publish_task(params, kimi_result['kimi_title'])
+                        logging(
+                            code="3004",
+                            info="publish_success",
+                            trace_id=trace_id
+                        )
                     except Exception as e:
                         logging(
-                            code="9001",
+                            code="6004",
                             info="publish 失败--{}".format(e),
                             trace_id=params['trace_id']
                         )
                 else:
                     logging(
-                        code="8001",
+                        code="6003",
                         info="ETL 处理失败",
                         trace_id=params['trace_id']
                     )
             else:
                 logging(
-                    code="7002",
+                    code="6002",
                     info="爬虫处理失败",
                     trace_id=params['trace_id']
                 )