Jelajahi Sumber

add-audit
and bug fix

luojunhui 3 bulan lalu
induk
melakukan
b3454f9ffc

+ 1 - 0
applications/const/server_const.py

@@ -64,6 +64,7 @@ class ServerConst:
 
     # 相关性过滤阈值
     NLP_SIMILARITY_THRESHOLD = 0.45
+    NLP_VERSION = 1
 
     JCD_SIMILARITY_THRESHOLD = 0
 

+ 1 - 1
applications/match_algorithm/rank.py

@@ -63,7 +63,7 @@ async def title_similarity_with_nlp(content_title, recall_list) -> Dict:
     """
     通过相关性模型来计算文章标题和搜索标题之间的相关性
     """
-    title_list = [i['title'] for i in recall_list]
+    title_list = [i['result']['title'] for i in recall_list]
     score_list = await nlp_title_similarity(
         url=nlp_base_url,
         ori_title=content_title,

+ 5 - 4
applications/spider/__init__.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+import json
 from datetime import datetime
 
 from applications.feishu import bot
@@ -58,9 +59,9 @@ async def save_video_to_mysql(video_obj, user, trace_id, platform, content_id, c
     mq_obj['content_id'] = content_id
     insert_sql = f"""
     INSERT INTO {crawler_video_table}
-    (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id, score)
+    (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id, score, score_version)
     values 
-    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
     """
     await db_client.async_insert(
         sql=insert_sql,
@@ -78,7 +79,8 @@ async def save_video_to_mysql(video_obj, user, trace_id, platform, content_id, c
             mq_obj['cover_url'],
             mq_obj['user_id'],
             trace_id,
-            similarity_score
+            similarity_score,
+            server_const.NLP_VERSION
         )
     )
 
@@ -122,7 +124,6 @@ async def search_videos_from_web(info, gh_id_map, db_client):
             platform = recall_obj['platform']
             recall_video = recall_obj['result']
             score = recall_obj['score']
-
             # 过滤掉nlp分低于0.45的
             if score < server_const.NLP_SIMILARITY_THRESHOLD:
                 continue

+ 71 - 64
tasks/new_contentId_task.py

@@ -220,9 +220,9 @@ class NewContentIdTask(object):
                 if content_status in {
                     NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
                     NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
-                    NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
+                    # NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
                     NewContentIdTaskConst.TASK_PROCESSING_STATUS,
-                    NewContentIdTaskConst.TASK_PUBLISHED_STATUS
+                    # NewContentIdTaskConst.TASK_PUBLISHED_STATUS
                 }:
                     return True
             return False
@@ -276,50 +276,6 @@ class NewContentIdTask(object):
         :return:
         """
         trace_id = params['trace_id']
-        if params.get("root_content_id"):
-            kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
-            if kimi_result:
-                affected_rows = await self.update_content_status(
-                    new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
-                    trace_id=trace_id,
-                    ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
-                )
-                if affected_rows == 0:
-                    logging(
-                        code="6000",
-                        info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
-                    )
-                    return
-                
-                # 将root_content_id的kimi结果更新到content_id
-                content_id = params['content_id']
-                update_count =  await update_kimi_status(
-                    kimi_info=kimi_result,
-                    content_id=content_id,
-                    db_client=self.long_articles_client,
-                    article_text_table=self.article_text_table,
-                    success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
-                    init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
-                )
-
-                if update_count == 0:
-                    logging(
-                        code="6000",
-                        info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
-                    )
-                    return
-                logging(
-                    code="8023",
-                    function="kimi_task",
-                    trace_id=trace_id,
-                    info="从root_content_id获取结果",
-                    data=params
-                )
-                return kimi_result
-            else:
-                params.pop('root_content_id', None)
-                print(params)
-                return await self.kimi_task(params)
 
         # 处理content_id
         content_id = params['content_id']
@@ -348,6 +304,53 @@ class NewContentIdTask(object):
             )
 
         else:
+            # 校验是否存在root_content_id
+            if params.get("root_content_id"):
+                kimi_result = await get_kimi_result(content_id=params['root_content_id'],
+                                                    article_text_table=self.article_text_table,
+                                                    db_client=self.long_articles_client)
+                if kimi_result:
+                    affected_rows = await self.update_content_status(
+                        new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                        trace_id=trace_id,
+                        ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
+                    )
+                    if affected_rows == 0:
+                        logging(
+                            code="6000",
+                            info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                        )
+                        return
+
+                    # 将root_content_id的kimi结果更新到content_id
+                    content_id = params['content_id']
+                    update_count = await update_kimi_status(
+                        kimi_info=kimi_result,
+                        content_id=content_id,
+                        db_client=self.long_articles_client,
+                        article_text_table=self.article_text_table,
+                        success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
+                        init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
+                    )
+
+                    if update_count == 0:
+                        logging(
+                            code="6000",
+                            info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                        )
+                        return
+                    logging(
+                        code="8023",
+                        function="kimi_task",
+                        trace_id=trace_id,
+                        info="从root_content_id获取结果",
+                        data=params
+                    )
+                    return kimi_result
+                else:
+                    params.pop('root_content_id', None)
+                    return await self.kimi_task(params)
+
             # 开始处理,讲 content_status 从 0  改为  101
             affected_rows = await self.update_content_status(
                 new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
@@ -404,6 +407,20 @@ class NewContentIdTask(object):
         content_id = params['content_id']
         process_times = params['process_times']
         gh_id = params['gh_id']
+
+        download_video_exist_flag = await whether_downloaded_videos_exists(
+            content_id=content_id,
+            article_crawler_video_table=self.article_crawler_video_table,
+            db_client=self.long_articles_client
+        )
+        if download_video_exist_flag:
+            await self.update_content_status(
+                new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
+                trace_id=trace_id,
+                ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
+            )
+            return True
+        # 判断是否存在root_content_id
         if params.get("root_content_id"):
             # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
             update_rows = await update_crawler_table_with_exist_content_id(
@@ -433,19 +450,6 @@ class NewContentIdTask(object):
                 params.pop("root_content_id", None)
                 return await self.spider_task(params, kimi_result)
 
-        download_video_exist_flag = await whether_downloaded_videos_exists(
-            content_id=content_id,
-            article_crawler_video_table=self.article_crawler_video_table,
-            db_client=self.long_articles_client
-        )
-        if download_video_exist_flag:
-            await self.update_content_status(
-                new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
-                trace_id=trace_id,
-                ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
-            )
-            return True
-
         # 开始处理,将状态由 1 改成  101
         affected_rows = await self.update_content_status(
             new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
@@ -668,6 +672,7 @@ class NewContentIdTask(object):
         :param params:
         :return:
         """
+        print("start process")
         kimi_result = await self.kimi_task(params)
         trace_id = params['trace_id']
         process_times = params['process_times']
@@ -787,6 +792,7 @@ class NewContentIdTask(object):
         处理任务
         :return:
         """
+        print(json.dumps(params, ensure_ascii=False, indent=4))
         content_id = params['content_id']
         flow_pool_level = params['flow_pool_level']
 
@@ -795,9 +801,11 @@ class NewContentIdTask(object):
             article_crawler_video_table=self.article_crawler_video_table,
             db_client=self.long_articles_client
         )
+        print("开始处理")
         if not download_videos_exists_flag:
             processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
             if processing_flag:
+                print("processing success")
                 logging(
                     code="9001",
                     info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
@@ -821,13 +829,13 @@ class NewContentIdTask(object):
         :return:
         """
         # 处理未托管的任务
-        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
+        # await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
 
         # 处理托管任务
-        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
+        # await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
 
         # 将处理次数大于3次且未成功的任务置为失败
-        await self.set_tasks_status_fail()
+        # await self.set_tasks_status_fail()
 
         # 获取task_list
         task_list = await self.get_tasks()
@@ -838,11 +846,10 @@ class NewContentIdTask(object):
             info="Match Task Got {} this time".format(len(process_list)),
             function="Publish Task"
         )
-
         # 处理process_list
         if process_list:
             a = time.time()
-            tasks = [self.process_each_task(params) for params in process_list]
+            tasks = [self.process_each_task(params) for params in process_list[:1]]
             await asyncio.gather(*tasks)
             b = time.time()
             print("处理时间: {} s".format(b - a))