Browse Source

2024-10-14
新 content-id 抓取任务卡死 bug fix

处理失败的文章 code = 99, 会命中这个判断
if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
导致 return True

罗俊辉 6 months ago
parent
commit
9937d736ce
1 changed files with 134 additions and 155 deletions
  1. 134 155
      tasks/newContentIdTask.py

+ 134 - 155
tasks/newContentIdTask.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+
 import json
 import time
 
@@ -19,6 +20,7 @@ class NewContentIdTask(object):
     """
     不存在历史已经发布的文章的匹配流程
     """
+
     TASK_INIT_STATUS = 0
     TASK_KIMI_FINISHED_STATUS = 1
     TASK_SPIDER_FINISHED_STATUS = 2
@@ -54,22 +56,23 @@ class NewContentIdTask(object):
                     content_status = {self.TASK_PROCESSING_STATUS} 
                 and process_times <= {self.TASK_MAX_PROCESS_TIMES}; 
         """
-        processing_articles = await self.mysql_client.async_select(select_processing_sql)
+        processing_articles = await self.mysql_client.async_select(
+            select_processing_sql
+        )
         if processing_articles:
             processing_list = [
                 {
                     "trace_id": item[0],
                     "content_status_update_time": item[1],
-                    "process_times": item[2]
+                    "process_times": item[2],
                 }
                 for item in processing_articles
             ]
             for obj in processing_list:
-                if int(time.time()) - obj['content_status_update_time'] >= 3600:
+                if int(time.time()) - obj["content_status_update_time"] >= 3600:
                     # 认为该任务失败
                     await self.roll_back_content_status_when_fails(
-                        process_times=obj['process_times'] + 1,
-                        trace_id=obj['trace_id']
+                        process_times=obj["process_times"] + 1, trace_id=obj["trace_id"]
                     )
         # 将  process_times > 3 且状态不为 4 的任务的状态修改为失败,
         update_status_sql = f"""
@@ -85,8 +88,8 @@ class NewContentIdTask(object):
             params=(
                 self.TASK_FAIL_STATUS,
                 self.TASK_MAX_PROCESS_TIMES,
-                self.TASK_PUBLISHED_STATUS
-            )
+                self.TASK_PUBLISHED_STATUS,
+            ),
         )
         # 获取  process_times <= 3 且  content_status = 0 的任务
         select_sql = f"""
@@ -107,7 +110,7 @@ class NewContentIdTask(object):
                     "content_id": i[1],
                     "flow_pool_level": i[2],
                     "gh_id": i[3],
-                    "process_times": i[4]
+                    "process_times": i[4],
                 }
                 for i in tasks
             ]
@@ -131,7 +134,9 @@ class NewContentIdTask(object):
         else:
             return False
 
-    async def update_content_status(self, new_content_status, trace_id, ori_content_status):
+    async def update_content_status(
+        self, new_content_status, trace_id, ori_content_status
+    ):
         """
         :param new_content_status:
         :param trace_id:
@@ -145,12 +150,7 @@ class NewContentIdTask(object):
                     """
         row_counts = await self.mysql_client.async_insert(
             sql=update_sql,
-            params=(
-                new_content_status,
-                int(time.time()),
-                trace_id,
-                ori_content_status
-            )
+            params=(new_content_status, int(time.time()), trace_id, ori_content_status),
         )
         return row_counts
 
@@ -176,8 +176,8 @@ class NewContentIdTask(object):
                 int(time.time()),
                 process_times + 1,
                 trace_id,
-                self.TASK_PROCESSING_STATUS
-            )
+                self.TASK_PROCESSING_STATUS,
+            ),
         )
 
     async def judge_whether_same_content_id_is_processing(self, content_id):
@@ -185,6 +185,9 @@ class NewContentIdTask(object):
         同一个 content_id只需要处理一次
         :param content_id:
         :return:
+        success: 4
+        init: 0
+        fail: 99
         """
         select_sql = f"""
                    SELECT distinct content_status
@@ -195,7 +198,13 @@ class NewContentIdTask(object):
         if result:
             for item in result:
                 content_status = item[0]
-                if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
+                # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
+                if content_status in {
+                    self.TASK_KIMI_FINISHED_STATUS,
+                    self.TASK_SPIDER_FINISHED_STATUS,
+                    self.TASK_ETL_FINISHED_STATUS,
+                    self.TASK_PROCESSING_STATUS,
+                }:
                     return True
             return False
         else:
@@ -220,7 +229,7 @@ class NewContentIdTask(object):
                 "like_count": i[2],
                 "video_oss_path": i[3],
                 "cover_oss_path": i[4],
-                "uid": i[5]
+                "uid": i[5],
             }
             for i in res_tuple
         ]
@@ -249,21 +258,21 @@ class NewContentIdTask(object):
         """
         KIMI_SUCCESS_STATUS = 1
         KIMI_FAIL_STATUS = 2
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        process_times = params['process_times']
+        content_id = params["content_id"]
+        trace_id = params["trace_id"]
+        process_times = params["process_times"]
         kimi_status_code = await self.get_kimi_status(content_id=content_id)
 
         if kimi_status_code == KIMI_SUCCESS_STATUS:
             affected_rows = await self.update_content_status(
                 new_content_status=self.TASK_KIMI_FINISHED_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.TASK_INIT_STATUS
+                ori_content_status=self.TASK_INIT_STATUS,
             )
             if affected_rows == 0:
                 logging(
                     code="6000",
-                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
                 )
                 return
             get_kimi_sql = f"""
@@ -276,24 +285,21 @@ class NewContentIdTask(object):
                 "kimi_title": kimi_info[0][1],
                 "ori_title": kimi_info[0][0],
                 "kimi_summary": kimi_info[0][2],
-                "kimi_keys": json.loads(kimi_info[0][3])
+                "kimi_keys": json.loads(kimi_info[0][3]),
             }
         elif kimi_status_code == self.ARTICLE_TEXT_TABLE_ERROR:
-            logging(
-                code="4000",
-                info="long_articles_text表中未找到 content_id"
-            )
+            logging(code="4000", info="long_articles_text表中未找到 content_id")
         else:
             # 开始处理,讲 content_status 从 0  改为  101
             affected_rows = await self.update_content_status(
                 new_content_status=self.TASK_PROCESSING_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.TASK_INIT_STATUS
+                ori_content_status=self.TASK_INIT_STATUS,
             )
             if affected_rows == 0:
                 logging(
                     code="6000",
-                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
                 )
                 return
             K = KimiServer()
@@ -307,12 +313,14 @@ class NewContentIdTask(object):
                 article_obj = {
                     "article_title": res[0][0],
                     "article_text": res[0][1],
-                    "content_id": content_id
+                    "content_id": content_id,
                 }
                 kimi_info = await K.search_kimi_schedule(params=article_obj)
-                kimi_title = kimi_info['k_title']
-                content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
-                content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
+                kimi_title = kimi_info["k_title"]
+                content_title = (
+                    kimi_info["content_title"].replace("'", "").replace('"', "")
+                )
+                content_keys = json.dumps(kimi_info["content_keys"], ensure_ascii=False)
                 update_kimi_sql = f"""
                         UPDATE {self.article_text_table} 
                         SET
@@ -323,18 +331,24 @@ class NewContentIdTask(object):
                         WHERE content_id = %s;"""
                 await self.mysql_client.async_insert(
                     sql=update_kimi_sql,
-                    params=(kimi_title, content_title, content_keys, KIMI_SUCCESS_STATUS, params['content_id'])
+                    params=(
+                        kimi_title,
+                        content_title,
+                        content_keys,
+                        KIMI_SUCCESS_STATUS,
+                        params["content_id"],
+                    ),
                 )
                 await self.update_content_status(
                     new_content_status=self.TASK_KIMI_FINISHED_STATUS,
                     trace_id=trace_id,
-                    ori_content_status=self.TASK_PROCESSING_STATUS
+                    ori_content_status=self.TASK_PROCESSING_STATUS,
                 )
                 return {
                     "kimi_title": kimi_title,
-                    "ori_title": article_obj['article_title'],
+                    "ori_title": article_obj["article_title"],
                     "kimi_summary": content_title,
-                    "kimi_keys": kimi_info['content_keys']
+                    "kimi_keys": kimi_info["content_keys"],
                 }
             except Exception as e:
                 # kimi 任务处理失败
@@ -345,16 +359,11 @@ class NewContentIdTask(object):
                         WHERE content_id = %s
                         """
                 await self.mysql_client.async_insert(
-                    sql=update_kimi_sql,
-                    params=(
-                        KIMI_FAIL_STATUS,
-                        content_id
-                    )
+                    sql=update_kimi_sql, params=(KIMI_FAIL_STATUS, content_id)
                 )
                 # 将状态由 101  回退为  0
                 await self.roll_back_content_status_when_fails(
-                    process_times=process_times,
-                    trace_id=trace_id
+                    process_times=process_times, trace_id=trace_id
                 )
                 return {}
 
@@ -364,10 +373,10 @@ class NewContentIdTask(object):
         :return:
         """
         SPIDER_INIT_STATUS = 1
-        trace_id = params['trace_id']
-        content_id = params['content_id']
-        process_times = params['process_times']
-        gh_id = params['gh_id']
+        trace_id = params["trace_id"]
+        content_id = params["content_id"]
+        process_times = params["process_times"]
+        gh_id = params["gh_id"]
         select_sql = f"""
         select count(id) from {self.article_crawler_video_table} where content_id = '{content_id}';
         """
@@ -377,53 +386,50 @@ class NewContentIdTask(object):
             await self.update_content_status(
                 new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
-                ori_content_status=SPIDER_INIT_STATUS
+                ori_content_status=SPIDER_INIT_STATUS,
             )
             return True
         # 开始处理,将状态由 1 改成  101
         affected_rows = await self.update_content_status(
             new_content_status=self.TASK_PROCESSING_STATUS,
             ori_content_status=SPIDER_INIT_STATUS,
-            trace_id=trace_id
+            trace_id=trace_id,
         )
         if affected_rows == 0:
             logging(
-                code="6000",
-                info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
             )
             return False
         try:
             search_videos_count = await search_videos_from_web(
                 info={
-                    "ori_title": kimi_result['ori_title'],
-                    "kimi_summary": kimi_result['kimi_summary'],
-                    "kimi_keys": kimi_result['kimi_keys'],
+                    "ori_title": kimi_result["ori_title"],
+                    "kimi_summary": kimi_result["kimi_summary"],
+                    "kimi_keys": kimi_result["kimi_keys"],
                     "trace_id": trace_id,
                     "gh_id": gh_id,
                     "content_id": content_id,
-                    "crawler_video_table": self.article_crawler_video_table
+                    "crawler_video_table": self.article_crawler_video_table,
                 },
                 gh_id_map=self.account_map,
-                db_client=self.mysql_client
+                db_client=self.mysql_client,
             )
             if search_videos_count >= 3:
                 # 表示爬虫任务执行成功, 将状态从 101  改为 2
                 await self.update_content_status(
                     new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
                     trace_id=trace_id,
-                    ori_content_status=self.TASK_PROCESSING_STATUS
+                    ori_content_status=self.TASK_PROCESSING_STATUS,
                 )
                 return True
             else:
                 await self.roll_back_content_status_when_fails(
-                    process_times=process_times + 1,
-                    trace_id=trace_id
+                    process_times=process_times + 1, trace_id=trace_id
                 )
                 return False
         except Exception as e:
             await self.roll_back_content_status_when_fails(
-                process_times=process_times + 1,
-                trace_id=trace_id
+                process_times=process_times + 1, trace_id=trace_id
             )
             print("爬虫处理失败: {}".format(e))
             return False
@@ -437,8 +443,8 @@ class NewContentIdTask(object):
         VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
         VIDEO_DOWNLOAD_FAIL_STATUS = 3
         ETL_TASK_INIT_STATUS = 2
-        trace_id = params['trace_id']
-        content_id = params['content_id']
+        trace_id = params["trace_id"]
+        content_id = params["content_id"]
         # 判断是否有三条已经下载完成的视频
         select_sql = f"""
             select count(id) 
@@ -451,12 +457,12 @@ class NewContentIdTask(object):
             affect_rows = await self.update_content_status(
                 ori_content_status=ETL_TASK_INIT_STATUS,
                 trace_id=trace_id,
-                new_content_status=self.TASK_ETL_FINISHED_STATUS
+                new_content_status=self.TASK_ETL_FINISHED_STATUS,
             )
             if affect_rows == 0:
                 logging(
                     code="6000",
-                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
                 )
                 return False
             return True
@@ -465,12 +471,12 @@ class NewContentIdTask(object):
             affected_rows = await self.update_content_status(
                 ori_content_status=ETL_TASK_INIT_STATUS,
                 trace_id=trace_id,
-                new_content_status=self.TASK_PROCESSING_STATUS
+                new_content_status=self.TASK_PROCESSING_STATUS,
             )
             if affected_rows == 0:
                 logging(
                     code="6000",
-                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
                 )
                 return False
             select_sql = f"""
@@ -479,7 +485,9 @@ class NewContentIdTask(object):
                 WHERE content_id = '{content_id}' and download_status != {VIDEO_DOWNLOAD_SUCCESS_STATUS}
                 ORDER BY score DESC;
             """
-            videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
+            videos_need_to_download_tuple = await self.mysql_client.async_select(
+                select_sql
+            )
             downloaded_count = 0
             for line in videos_need_to_download_tuple:
                 params = {
@@ -490,30 +498,30 @@ class NewContentIdTask(object):
                     "video_url": line[4],
                     "cover_url": line[5],
                     "user_id": line[6],
-                    "trace_id": line[7]
+                    "trace_id": line[7],
                 }
                 try:
-                    local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
+                    local_video_path, local_cover_path = generate_video_path(
+                        params["platform"], params["video_id"]
+                    )
                     # download videos
                     file_path = await download_video(
                         file_path=local_video_path,
-                        platform=params['platform'],
-                        video_url=params['video_url']
+                        platform=params["platform"],
+                        video_url=params["video_url"],
                     )
                     # download cover
                     cover_path = await download_cover(
                         file_path=local_cover_path,
-                        platform=params['platform'],
-                        cover_url=params['cover_url']
+                        platform=params["platform"],
+                        cover_url=params["cover_url"],
                     )
                     oss_video = await upload_to_oss(
-                        local_video_path=file_path,
-                        download_type="video"
+                        local_video_path=file_path, download_type="video"
                     )
                     if cover_path:
                         oss_cover = await upload_to_oss(
-                            local_video_path=cover_path,
-                            download_type="image"
+                            local_video_path=cover_path, download_type="image"
                         )
                     else:
                         oss_cover = None
@@ -528,15 +536,15 @@ class NewContentIdTask(object):
                             oss_video,
                             oss_cover,
                             VIDEO_DOWNLOAD_SUCCESS_STATUS,
-                            params['id']
-                        )
+                            params["id"],
+                        ),
                     )
                     downloaded_count += 1
                     if downloaded_count > 3:
                         await self.update_content_status(
                             ori_content_status=self.TASK_PROCESSING_STATUS,
                             trace_id=trace_id,
-                            new_content_status=self.TASK_ETL_FINISHED_STATUS
+                            new_content_status=self.TASK_ETL_FINISHED_STATUS,
                         )
                         return True
                 except Exception as e:
@@ -547,19 +555,19 @@ class NewContentIdTask(object):
                     """
                     await self.mysql_client.async_insert(
                         sql=update_sql,
-                        params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
+                        params=(VIDEO_DOWNLOAD_FAIL_STATUS, params["id"]),
                     )
             if downloaded_count >= 3:
                 await self.update_content_status(
                     ori_content_status=self.TASK_PROCESSING_STATUS,
                     trace_id=trace_id,
-                    new_content_status=self.TASK_ETL_FINISHED_STATUS
+                    new_content_status=self.TASK_ETL_FINISHED_STATUS,
                 )
                 return True
             else:
                 await self.roll_back_content_status_when_fails(
-                    process_times=params['process_times'] + 1,
-                    trace_id=params['trace_id']
+                    process_times=params["process_times"] + 1,
+                    trace_id=params["trace_id"],
                 )
                 return False
 
@@ -571,21 +579,20 @@ class NewContentIdTask(object):
         :return:
         """
         PUBLISH_DEFAULT_STATUS = 3
-        gh_id = params['gh_id']
-        flow_pool_level = params['flow_pool_level']
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        process_times = params['process_times']
+        gh_id = params["gh_id"]
+        flow_pool_level = params["flow_pool_level"]
+        content_id = params["content_id"]
+        trace_id = params["trace_id"]
+        process_times = params["process_times"]
         # 开始处理,将状态修改为操作状态
         affected_rows = await self.update_content_status(
             ori_content_status=PUBLISH_DEFAULT_STATUS,
             trace_id=trace_id,
-            new_content_status=self.TASK_PROCESSING_STATUS
+            new_content_status=self.TASK_PROCESSING_STATUS,
         )
         if affected_rows == 0:
             logging(
-                code="6000",
-                info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
             )
             return False
         try:
@@ -610,21 +617,21 @@ class NewContentIdTask(object):
             L = []
             for video_obj in video_list:
                 params = {
-                    "videoPath": video_obj['video_oss_path'],
-                    "uid": video_obj['uid'],
-                    "title": kimi_title
+                    "videoPath": video_obj["video_oss_path"],
+                    "uid": video_obj["uid"],
+                    "title": kimi_title,
                 }
                 publish_response = await publish_to_pq(params)
-                video_id = publish_response['data']['id']
+                video_id = publish_response["data"]["id"]
                 response = await get_pq_video_detail(video_id)
                 obj = {
-                    "uid": video_obj['uid'],
-                    "source": video_obj['platform'],
+                    "uid": video_obj["uid"],
+                    "source": video_obj["platform"],
                     "kimiTitle": kimi_title,
-                    "videoId": response['data'][0]['id'],
-                    "videoCover": response['data'][0]['shareImgPath'],
-                    "videoPath": response['data'][0]['videoPath'],
-                    "videoOss": video_obj['video_oss_path']
+                    "videoId": response["data"][0]["id"],
+                    "videoCover": response["data"][0]["shareImgPath"],
+                    "videoPath": response["data"][0]["videoPath"],
+                    "videoOss": video_obj["video_oss_path"],
                 }
                 L.append(obj)
             update_sql = f"""
@@ -640,13 +647,12 @@ class NewContentIdTask(object):
                     json.dumps(L, ensure_ascii=False),
                     process_times + 1,
                     trace_id,
-                    self.TASK_PROCESSING_STATUS
-                )
+                    self.TASK_PROCESSING_STATUS,
+                ),
             )
         except Exception as e:
             await self.roll_back_content_status_when_fails(
-                process_times=params['process_times'] + 1,
-                trace_id=params['trace_id']
+                process_times=params["process_times"] + 1, trace_id=params["trace_id"]
             )
             print(e)
 
@@ -659,79 +665,55 @@ class NewContentIdTask(object):
         # step1: 执行 kimi 操作
         # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
         kimi_result = await self.kimi_task(params)
-        trace_id = params['trace_id']
+        trace_id = params["trace_id"]
         if kimi_result:
             # 等待 kimi 操作执行完成之后,开始执行 spider_task
             print("kimi success")
-            logging(
-                code=3001,
-                info="kimi success",
-                trace_id=trace_id
-            )
+            logging(code=3001, info="kimi success", trace_id=trace_id)
             spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
             if spider_flag:
                 # 等待爬虫执行完成后,开始执行 etl_task
                 print("spider success")
-                logging(
-                    code=3002,
-                    info="spider_success",
-                    trace_id=trace_id
-                )
+                logging(code=3002, info="spider_success", trace_id=trace_id)
                 etl_flag = await self.etl_task(params)
                 if etl_flag:
                     # 等待下载上传完成,执行发布任务
                     print("etl success")
-                    logging(
-                        code="3003",
-                        info="etl_success",
-                        trace_id=trace_id
-                    )
+                    logging(code="3003", info="etl_success", trace_id=trace_id)
                     try:
-                        await self.publish_task(params, kimi_result['kimi_title'])
-                        logging(
-                            code="3004",
-                            info="publish_success",
-                            trace_id=trace_id
-                        )
+                        await self.publish_task(params, kimi_result["kimi_title"])
+                        logging(code="3004", info="publish_success", trace_id=trace_id)
                     except Exception as e:
                         logging(
                             code="6004",
                             info="publish 失败--{}".format(e),
-                            trace_id=params['trace_id']
+                            trace_id=params["trace_id"],
                         )
                 else:
                     logging(
-                        code="6003",
-                        info="ETL 处理失败",
-                        trace_id=params['trace_id']
+                        code="6003", info="ETL 处理失败", trace_id=params["trace_id"]
                     )
             else:
-                logging(
-                    code="6002",
-                    info="爬虫处理失败",
-                    trace_id=params['trace_id']
-                )
+                logging(code="6002", info="爬虫处理失败", trace_id=params["trace_id"])
         else:
-            logging(
-                code="6001",
-                info="kimi 处理失败",
-                trace_id=params['trace_id']
-            )
+            logging(code="6001", info="kimi 处理失败", trace_id=params["trace_id"])
 
     async def process_task(self, params):
         """
         处理任务
         :return:
         """
-        content_id = params['content_id']
+        content_id = params["content_id"]
         download_videos = await self.get_video_list(content_id)
         if not download_videos:
             # 开始处理, 判断是否有相同的文章 id 正在处理
-            processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
+            processing_flag = await self.judge_whether_same_content_id_is_processing(
+                content_id
+            )
             if processing_flag:
                 logging(
                     code="9001",
-                    info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
+                    info="该 content id 正在处理中, 跳过此任务--{}".format(content_id),
                 )
             else:
                 await self.start_process(params=params)
@@ -748,7 +730,7 @@ class NewContentIdTask(object):
         task_dict = {}
         # 对 content_id去重
         for task in task_list:
-            key = task['content_id']
+            key = task["content_id"]
             task_dict[key] = task
         process_list = []
         for item in task_dict:
@@ -756,7 +738,7 @@ class NewContentIdTask(object):
         logging(
             code="5001",
             info="Match Task Got {} this time".format(len(process_list)),
-            function="Publish Task"
+            function="Publish Task",
         )
         if task_list:
             total_task = len(process_list)
@@ -767,7 +749,4 @@ class NewContentIdTask(object):
             b = time.time()
             print("处理时间: {} s".format(b - a))
         else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )
+            logging(code="9008", info="没有要处理的请求")