Browse Source

content-id-rematch-task

luojunhui 3 months ago
parent
commit
e1d4d58926
1 changed files with 159 additions and 0 deletions
  1. 159 0
      content_rematch.py

+ 159 - 0
content_rematch.py

@@ -0,0 +1,159 @@
+"""
+@author: luojunhui
+"""
+import time
+import asyncio
+import datetime
+
+from applications.db import AsyncMySQLClient
+from applications.spider import search_videos_from_web
+from tasks.utils.kimi_task import get_kimi_result
+from tasks.utils.etl_task import async_download_videos
+
+
+class ContentRematch:
+    """
+    内容重新匹配
+    """
+
+    def __init__(self, db_client):
+        self.db_client = db_client
+
+    async def get_content_list(self):
+        """
+        获取待匹配的content_id列表
+        """
+        select_sql = f"""
+            SELECT content_id, count(1)
+            FROM long_articles_crawler_videos
+            WHERE download_status = 2
+            GROUP BY content_id
+            HAVING count(1) < 3
+            LIMIT 10;
+        """
+        content_list = await self.db_client.async_select(select_sql)
+        return content_list
+
+    async def get_illegal_out_ids(self, content_id: str) -> list:
+        """
+        获取违规的外站视频id
+        """
+        select_sql = f"""
+            SELECT platform, out_video_id
+            FROM long_articles_crawler_videos
+            WHERE content_id = '{content_id}' and is_illegal = 1;
+        """
+        response = await self.db_client.async_select(select_sql)
+        if response:
+            result = ["{}_{}".format(line[0], line[1]) for line in response]
+            return result
+        else:
+            return []
+
+    async def spider_task(self, params, kimi_result):
+        """
+        爬虫任务
+        :return:
+        """
+        content_id = params['content_id']
+        gh_id = "test_gh_id"
+        trace_id = "rematch_{}".format(content_id)
+
+        try:
+            search_videos_count = await search_videos_from_web(
+                info={
+                    "ori_title": kimi_result['ori_title'],
+                    "kimi_summary": kimi_result['kimi_summary'],
+                    "kimi_keys": kimi_result['kimi_keys'],
+                    "trace_id": trace_id,
+                    "gh_id": gh_id,
+                    "content_id": content_id,
+                    "crawler_video_table": "long_articles_crawler_videos",
+                },
+                gh_id_map={},
+                db_client=self.db_client
+            )
+            if search_videos_count >= 3:
+                return True
+            else:
+                return False
+        except Exception as e:
+            return False
+
+    async def etl_task(self, content_id, trace_id):
+        """
+        etl任务
+        """
+        # download videos
+        illegal_videos = await self.get_illegal_out_ids(content_id)
+        downloaded_count = await async_download_videos(
+            trace_id=trace_id,
+            content_id=content_id,
+            article_crawler_video_table="long_articles_crawler_videos",
+            db_client=self.db_client,
+            illegal_videos=illegal_videos
+        )
+
+        if downloaded_count >= 3:
+            return True
+        else:
+            return False
+
+    async def process_each_content(self, content):
+        """
+        处理每个content_id
+        """
+        content_id = content[0]
+
+        kimi_result = await get_kimi_result(
+            content_id=content_id,
+            db_client=self.db_client,
+            article_text_table="long_articles_text",
+        )
+
+        if not kimi_result:
+            return
+
+        spider_flag = await self.spider_task(
+            params={
+                "content_id": content_id,
+            },
+            kimi_result=kimi_result
+        )
+
+        if not spider_flag:
+            return
+        etl_flag = await self.etl_task(
+            content_id=content_id,
+            trace_id="rematch_{}".format(content_id)
+        )
+        if not etl_flag:
+            return
+        return True
+
+    async def deal(self):
+        """
+        处理每个content_id
+        """
+        content_list = await self.get_content_list()
+        for content in content_list[:1]:
+            print(content)
+            await self.process_each_content(content)
+
+
+async def main_job():
+    """
+    main job
+    :return:
+    """
+    async with AsyncMySQLClient() as long_articles_pool:
+        _task = ContentRematch(long_articles_pool)
+        await _task.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main_job())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待60s".format(now_str))
+        time.sleep(60)