Browse Source

修改newContentIdTask

luojunhui 4 tháng trước cách đây
mục cha
commit
b6e93c4409
2 tập tin đã thay đổi với 25 bổ sung3 xóa
  1. 21 2
      tasks/new_contentId_task.py
  2. 4 1
      tasks/utils/etl_task.py

+ 21 - 2
tasks/new_contentId_task.py

@@ -48,13 +48,14 @@ class NewContentIdTask(object):
                 SELECT content_id, count(1) as cnt
                 FROM {self.article_crawler_video_table}
                 WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                    AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
                 GROUP BY content_id
             ) t2
             ON t1.content_id = t2.content_id
             WHERE
                     t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
                 AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
-                AND t2.cnt IS NULL
+                AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
             ORDER BY flow_pool_level, request_timestamp
             LIMIT {self.spider_coroutines};
         """
@@ -253,6 +254,22 @@ class NewContentIdTask(object):
         else:
             return
 
+    async def get_illegal_out_ids(self, content_id: str) -> List[str]:
+        """
+        获取违规的外站视频id
+        """
+        select_sql = f"""
+            SELECT platform, out_video_id
+            FROM {self.article_crawler_video_table}
+            WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
+        """
+        response = await self.long_articles_client.async_select(select_sql)
+        if response:
+            result = ["{}_{}".format(line[0], line[1]) for line in response]
+            return result
+        else:
+            return []
+
     async def kimi_task(self, params):
         """
         执行 kimi 任务
@@ -515,11 +532,13 @@ class NewContentIdTask(object):
                 return False
 
             # download videos
+            illegal_videos = await self.get_illegal_out_ids(content_id)
             downloaded_count = await async_download_videos(
                 trace_id=trace_id,
                 content_id=content_id,
                 article_crawler_video_table=self.article_crawler_video_table,
-                db_client=self.long_articles_client
+                db_client=self.long_articles_client,
+                illegal_videos=illegal_videos
             )
 
             if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:

+ 4 - 1
tasks/utils/etl_task.py

@@ -6,7 +6,7 @@ from applications.const import new_content_id_task_const
 from applications.log import logging
 
 
-async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client):
+async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client, illegal_videos):
     """
     下载视频
     """
@@ -31,6 +31,9 @@ async def async_download_videos(trace_id, content_id, article_crawler_video_tabl
             "user_id": line[6],
             "trace_id": line[7]
         }
+        out_key = "{}_{}".format(params['platform'], params['video_id'])
+        if out_key in illegal_videos:
+            continue
         try:
             local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
             # download videos