Sfoglia il codice sorgente

Merge branch '2024-12-23-bugfix' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 4 mesi fa
parent
commit
32b9d04989
3 ha cambiato i file con 67 aggiunte e 51 eliminazioni
  1. 44 35
      tasks/new_contentId_task.py
  2. 10 6
      tasks/utils/kimi_task.py
  3. 13 10
      tasks/utils/spider_task.py

+ 44 - 35
tasks/new_contentId_task.py

@@ -261,25 +261,30 @@ class NewContentIdTask(object):
         trace_id = params['trace_id']
         if params.get("root_content_id"):
             kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
-            affected_rows = await self.update_content_status(
-                new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
-                trace_id=trace_id,
-                ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
-            )
-            if affected_rows == 0:
+            if kimi_result:
+                affected_rows = await self.update_content_status(
+                    new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                    trace_id=trace_id,
+                    ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
+                )
+                if affected_rows == 0:
+                    logging(
+                        code="6000",
+                        info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                    )
+                    return
                 logging(
-                    code="6000",
-                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                    code="8023",
+                    function="kimi_task",
+                    trace_id=trace_id,
+                    info="从root_content_id获取结果",
+                    data=params
                 )
-                return
-            logging(
-                code="8023",
-                function="kimi_task",
-                trace_id=trace_id,
-                info="从root_content_id获取结果",
-                data=params
-            )
-            return kimi_result
+                return kimi_result
+            else:
+                params.pop('root_content_id', None)
+                print(params)
+                return await self.kimi_task(params)
 
         # 处理content_id
         content_id = params['content_id']
@@ -321,7 +326,7 @@ class NewContentIdTask(object):
                 )
                 return
             try:
-                kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
+                kimi_result = await generate_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
                 await self.update_content_status(
                     new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
                     trace_id=trace_id,
@@ -361,28 +366,32 @@ class NewContentIdTask(object):
         gh_id = params['gh_id']
         if params.get("root_content_id"):
             # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
-            await update_crawler_table_with_exist_content_id(
+            update_rows = await update_crawler_table_with_exist_content_id(
                 content_id=content_id,
                 trace_id=trace_id,
                 article_crawler_video_table=self.article_crawler_video_table,
                 db_client=self.long_articles_client,
                 root_content_id=params['root_content_id']
             )
-            affected_rows = await self.update_content_status(
-                new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
-                trace_id=trace_id,
-                ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
-            )
-            if affected_rows == 0:
-                return
-            logging(
-                code="8024",
-                function="spider_task",
-                trace_id=trace_id,
-                info="从root_content_id获取结果",
-                data=params
-            )
-            return True
+            if update_rows:
+                affected_rows = await self.update_content_status(
+                    new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
+                    trace_id=trace_id,
+                    ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
+                )
+                if affected_rows == 0:
+                    return
+                logging(
+                    code="8024",
+                    function="spider_task",
+                    trace_id=trace_id,
+                    info="从root_content_id获取结果",
+                    data=params
+                )
+                return True
+            else:
+                params.pop("root_content_id", None)
+                return await self.spider_task(params, kimi_result)
 
         download_video_exist_flag = await whether_downloaded_videos_exists(
             content_id=content_id,
@@ -741,7 +750,7 @@ class NewContentIdTask(object):
                     root_content_id = await self.get_source_content_id(content_id)
                     if root_content_id:
                         # 传参新增root_content_id
-                        params['root_content_id'] = root_content_id[0][0]
+                        params['root_content_id'] = root_content_id
                 # 开始处理
                 await self.start_process(params=params)
         else:

+ 10 - 6
tasks/utils/kimi_task.py

@@ -38,12 +38,16 @@ async def get_kimi_result(content_id, article_text_table, db_client) -> Dict:
                 WHERE content_id = '{content_id}';
                 """
     kimi_info = await db_client.async_select(get_kimi_sql)
-    return {
-        "kimi_title": kimi_info[0][1],
-        "ori_title": kimi_info[0][0],
-        "kimi_summary": kimi_info[0][2],
-        "kimi_keys": json.loads(kimi_info[0][3])
-    }
+    if kimi_info:
+        print(kimi_info)
+        return {
+            "kimi_title": kimi_info[0][1],
+            "ori_title": kimi_info[0][0],
+            "kimi_summary": kimi_info[0][2],
+            "kimi_keys": json.loads(kimi_info[0][3])
+        }
+    else:
+        return {}
 
 
 async def generate_kimi_result(content_id, article_text_table, db_client) -> Dict:

+ 13 - 10
tasks/utils/spider_task.py

@@ -69,16 +69,19 @@ async def update_crawler_table_with_exist_content_id(root_content_id, content_id
             AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
     """
     res_tuple = await db_client.async_select(select_sql)
-    insert_list = [(trace_id, content_id) + row for row in res_tuple]
-    insert_sql = f"""
-        INSERT INTO {article_crawler_video_table}
-        (trace_id, content_id, out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time, 
-        duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, score, is_illegal)
-        VALUES 
-        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-    """
-    await db_client.async_insert_many(sql=insert_sql, params_list=insert_list)
-
+    if res_tuple:
+        insert_list = [(trace_id, content_id) + row for row in res_tuple]
+        insert_sql = f"""
+            INSERT INTO {article_crawler_video_table}
+            (trace_id, content_id, out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time, 
+            duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, score, is_illegal)
+            VALUES 
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        affected_rows = await db_client.async_insert_many(sql=insert_sql, params_list=insert_list)
+        return affected_rows
+    else:
+        return 0