Quellcode durchsuchen

new_content_id优化

luojunhui vor 4 Monaten
Ursprung
Commit
0cc6bfbea2

+ 22 - 1
applications/config/__init__.py

@@ -2,4 +2,25 @@
 @author: luojunhui
 """
 from .const import *
-from .apollo_config import Config
+from .apollo_config import Config
+
+# aigc后台数据库连接配置
+denet_config = {
+    'host': 'rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',
+    'port': 3306,
+    'user': 'crawler_admin',
+    'password': 'cyber#crawler_2023',
+    'db': 'aigc-admin-prod',
+    'charset': 'utf8mb4'
+}
+
+
+# 长文数据库连接配置
+long_articles_config = {
+    'host': 'rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
+    'port': 3306,
+    'user': 'changwen_admin',
+    'password': 'changwen@123456',
+    'db': 'long_articles',
+    'charset': 'utf8mb4'
+}

+ 4 - 1
applications/const/__init__.py

@@ -2,6 +2,9 @@
 @author: luojunhui
 """
 from .server_const import ServerConst
+from .task_const import HistoryContentIdTaskConst, NewContentIdTaskConst
 
 
-server_const = ServerConst()
+server_const = ServerConst()
+new_content_id_task_const = NewContentIdTaskConst()
+history_content_id_task_const = HistoryContentIdTaskConst()

+ 76 - 0
applications/const/task_const.py

@@ -0,0 +1,76 @@
+"""
+@author: luojunhui
+"""
+
+
+class HistoryContentIdTaskConst:
+    """
+    历史文章id任务常量
+    """
+    # 任务处理中
+    TASK_PROCESSING_STATUS = 101
+    # 任务初始化状态
+    TASK_INIT_STATUS = 0
+    # 任务视频ETL 完成状态
+    TASK_ETL_COMPLETE_STATUS = 3
+    # 任务发布完成状态
+    TASK_PUBLISHED_STATUS = 4
+    # 文章已经退场 or 晋级
+    EXIT_STATUS = 97
+    # 文章品类不匹配
+    MISMATCH_STATUS = 96
+    # 视频已经下载成功状态
+    VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
+    # 任务最多处理次数
+    TASK_MAX_PROCESS_TIMES = 3
+
+    # 与AIGC交互,发送处理完成的trace_id至AIGC系统
+    RECORD_SUCCESS_TRACE_ID_CODE = 2
+    RECORD_FAIL_TRACE_ID_CODE = 3
+
+    # 是否需要发布
+    NEED_PUBLISH = 1
+    DO_NOT_NEED_PUBLISH = 2
+
+    # 文章晋级or退场
+    UP_LEVEL_STATUS = 1
+    TITLE_EXIT_STATUS = -1
+
+    # 等待时间
+    NEED_PUBLISH_WAIT_TIME = 60
+    DO_NOT_NEED_PUBLISH_WAIT_TIME = 3
+
+    # 视频安全状态
+    VIDEO_UNSAFE = 1
+    VIDEO_SAFE = 0
+
+
+class NewContentIdTaskConst(HistoryContentIdTaskConst):
+    """
+    新文章id任务常量
+    """
+    # KIMI 执行完成状态
+    TASK_KIMI_FINISHED_STATUS = 1
+    # 爬虫执行完成状态
+    TASK_SPIDER_FINISHED_STATUS = 2
+    # 处理失败状态
+    TASK_FAIL_STATUS = 99
+
+    # KIMI非法状态
+    KIMI_ILLEGAL_STATUS = 95
+
+    # 存入文章表失败状态
+    ARTICLE_TEXT_TABLE_ERROR = 98
+
+    # 文章处理状态超时时间
+    TASK_PROCESSING_TIMEOUT = 3600
+
+    # 匹配最少视频数量
+    MIN_MATCH_VIDEO_NUM = 3
+
+    # long_articles_text中,KIMI处理状态
+    KIMI_SUCCESS_STATUS = 1
+    KIMI_FAIL_STATUS = 2
+
+    # 视频下载失败状态
+    VIDEO_DOWNLOAD_FAIL_STATUS = 3

+ 16 - 8
applications/db/__init__.py

@@ -3,13 +3,16 @@
 """
 import aiomysql
 
+from applications.config import denet_config, long_articles_config
+
 
 class AsyncMySQLClient(object):
     """
     异步 mysql 连接池
     """
 
-    def __init__(self, app=None):
+    def __init__(self, app=None, aigc=False):
+        self.aigc = aigc
         if not app:
             self.mysql_pool = None
         else:
@@ -20,16 +23,21 @@ class AsyncMySQLClient(object):
         初始化连接
         :return:
         """
+        if self.aigc:
+            db_config = denet_config
+        else:
+            db_config = long_articles_config
+
         self.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='changwen_admin',
-            password='changwen@123456',
-            db='long_articles',
-            charset='utf8mb4',
+            host=db_config['host'],
+            port=db_config['port'],
+            user=db_config['user'],
+            password=db_config['password'],
+            db=db_config['db'],
+            charset=db_config['charset'],
             connect_timeout=120,
         )
-        print("mysql init successfully")
+        print("{} mysql init successfully".format("Denet" if self.aigc else "长文"))
 
     async def close_pool(self):
         """

+ 2 - 7
newContentIdTask.py

@@ -13,13 +13,8 @@ async def main_job():
     main job
     :return:
     """
-    # async_mysql_pool = AsyncMySQLClient()
-    # await async_mysql_pool.init_pool()
-    # new_content_id_task = NewContentIdTask(async_mysql_pool)
-    # await new_content_id_task.deal()
-    async with AsyncMySQLClient() as async_mysql_pool:
-        # await async_mysql_pool.init_pool()
-        new_content_id_task = NewContentIdTask(async_mysql_pool)
+    async with AsyncMySQLClient() as long_articles_pool, AsyncMySQLClient(aigc=True) as aigc_pool:
+        new_content_id_task = NewContentIdTask(long_articles_pool, aigc_pool)
         await new_content_id_task.deal()
 
 

+ 3 - 2
tasks/history_task.py

@@ -7,7 +7,8 @@ import asyncio
 import traceback
 
 from applications.feishu import bot
-from applications.config import Config, HistoryContentIdTaskConst
+from applications.config import Config
+from applications.const import HistoryContentIdTaskConst
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
@@ -51,7 +52,7 @@ class historyContentIdTask(object):
             JOIN (
                 select content_id, count(1) as cnt 
                 from {self.article_crawler_video_table}
-                where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS} and is_illegal = {self.const.VIDEO_SAFE}
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
             WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}

+ 184 - 337
tasks/new_contentId_task.py

@@ -3,17 +3,18 @@
 """
 import json
 import time
+import asyncio
+
+from typing import List, Dict
 
 from applications.config import Config
 from applications.config.const import new_content_id_task as NewContentIdTaskConst
 from applications.log import logging
-from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
-from applications.functions.kimi import KimiServer
 from applications.spider import search_videos_from_web
-from applications.etl_function import *
 from applications.feishu import bot
 from applications.functions.aigc import record_trace_id
+from .utils import *
 
 
 class NewContentIdTask(object):
@@ -21,8 +22,9 @@ class NewContentIdTask(object):
     不存在历史已经发布的文章的匹配流程
     """
 
-    def __init__(self, mysql_client):
-        self.mysql_client = mysql_client
+    def __init__(self, long_articles_client, aigc_client):
+        self.long_articles_client = long_articles_client
+        self.aigc_client = aigc_client
         self.config = Config()
         self.article_match_video_table = self.config.article_match_video_table
         self.article_text_table = self.config.article_text_table
@@ -31,34 +33,11 @@ class NewContentIdTask(object):
         self.account_map = json.loads(self.config.get_config_value("accountMap"))
         self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
 
-    async def get_tasks(self):
+    async def get_tasks(self) -> List[Dict]:
         """
         获取 task
         :return:
         """
-        # 处理未托管的任务
-        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
-
-        # 处理托管任务
-        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
-        # 将  process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
-        update_status_sql = f"""
-            UPDATE 
-                {self.article_match_video_table}
-            SET 
-                content_status = %s
-            WHERE 
-                process_times > %s and content_status not in (%s, %s);
-        """
-        await self.mysql_client.async_insert(
-            update_status_sql,
-            params=(
-                NewContentIdTaskConst.TASK_FAIL_STATUS,
-                NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
-                NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
-                NewContentIdTaskConst.TASK_PUBLISHED_STATUS
-            )
-        )
         # 获取  process_times <= 3 且  content_status = 0 的任务
         select_sql = f"""
             SELECT
@@ -79,7 +58,7 @@ class NewContentIdTask(object):
             ORDER BY flow_pool_level, request_timestamp
             LIMIT {self.spider_coroutines};
         """
-        tasks = await self.mysql_client.async_select(select_sql)
+        tasks = await self.long_articles_client.async_select(select_sql)
         if tasks:
             return [
                 {
@@ -95,18 +74,40 @@ class NewContentIdTask(object):
         else:
             return []
 
-    async def roll_back_unfinished_tasks(self, publish_flag):
+    async def set_tasks_status_fail(self) -> None:
+        """
+        将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
+        """
+        update_status_sql = f"""
+                    UPDATE 
+                        {self.article_match_video_table}
+                    SET 
+                        content_status = %s
+                    WHERE 
+                        process_times > %s and content_status not in (%s, %s);
+                """
+        await self.long_articles_client.async_insert(
+            update_status_sql,
+            params=(
+                NewContentIdTaskConst.TASK_FAIL_STATUS,
+                NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
+                NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
+                NewContentIdTaskConst.TASK_PUBLISHED_STATUS
+            )
+        )
+
+    async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
         """
         将长时间处于中间状态的任务回滚
         """
         # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
         if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
             processing_status_tuple = (
-                                NewContentIdTaskConst.TASK_PROCESSING_STATUS,
-                                NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
-                                NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
-                                NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
-                                )
+                NewContentIdTaskConst.TASK_PROCESSING_STATUS,
+                NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
+                NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
+            )
         elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
             processing_status_tuple = (
                 NewContentIdTaskConst.TASK_PROCESSING_STATUS,
@@ -124,8 +125,8 @@ class NewContentIdTask(object):
                 content_status in {processing_status_tuple}
                 and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
                 and publish_flag = {publish_flag}; 
-                                        """
-        processing_articles = await self.mysql_client.async_select(select_processing_sql)
+        """
+        processing_articles = await self.long_articles_client.async_select(select_processing_sql)
         if processing_articles:
             processing_list = [
                 {
@@ -145,23 +146,6 @@ class NewContentIdTask(object):
                         ori_content_status=obj['content_status']
                     )
 
-    async def get_video_list(self, content_id):
-        """
-        判断该文章是否存在历史匹配视频
-        :param content_id
-        :return:
-        """
-        sql = f"""
-        SELECT id
-        FROM {self.article_crawler_video_table}
-        WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
-        """
-        res_tuple = await self.mysql_client.async_select(sql)
-        if len(res_tuple) >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
-            return True
-        else:
-            return False
-
     async def update_content_status(self, new_content_status, trace_id, ori_content_status):
         """
         :param new_content_status:
@@ -174,7 +158,7 @@ class NewContentIdTask(object):
                     SET content_status = %s, content_status_update_time = %s
                     WHERE trace_id = %s and content_status = %s;
                     """
-        row_counts = await self.mysql_client.async_insert(
+        row_counts = await self.long_articles_client.async_insert(
             sql=update_sql,
             params=(
                 new_content_status,
@@ -185,7 +169,8 @@ class NewContentIdTask(object):
         )
         return row_counts
 
-    async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
+    async def roll_back_content_status_when_fails(self, process_times, trace_id,
+                                                  ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
         """
         处理失败,回滚至初始状态,处理次数加 1
         :param process_times:
@@ -201,7 +186,7 @@ class NewContentIdTask(object):
                                 process_times = %s
                             WHERE trace_id = %s and content_status = %s;
                         """
-        await self.mysql_client.async_insert(
+        await self.long_articles_client.async_insert(
             sql=update_article_sql,
             params=(
                 NewContentIdTaskConst.TASK_INIT_STATUS,
@@ -220,14 +205,13 @@ class NewContentIdTask(object):
         success: 4
         init: 0
         fail: 99
-        todo: 存在处理失败的content_id是否需要不再处理
         """
         select_sql = f"""
                    SELECT distinct content_status
                    FROM {self.article_match_video_table}
                    WHERE content_id = '{content_id}';
         """
-        result = await self.mysql_client.async_select(select_sql)
+        result = await self.long_articles_client.async_select(select_sql)
         if result:
             for item in result:
                 content_status = item[0]
@@ -244,56 +228,56 @@ class NewContentIdTask(object):
         else:
             return False
 
-    async def get_downloaded_videos(self, content_id):
+    async def get_source_content_id(self, new_content_id):
         """
-        获取已下载的视频
-        :return:
+        通过content_id 查找源content_id,并且更新其内容
         """
-        sql = f"""
-                SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
-                FROM {self.article_crawler_video_table}
-                WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
-                ORDER BY score DESC;
-                """
-        res_tuple = await self.mysql_client.async_select(sql)
-        return [
-            {
-                "platform": i[0],
-                "play_count": i[1],
-                "like_count": i[2],
-                "video_oss_path": i[3],
-                "cover_oss_path": i[4],
-                "uid": i[5]
-            }
-            for i in res_tuple
-        ]
-
-    async def get_kimi_status(self, content_id):
-        """
-        通过 content_id 获取kimi info
-        :return:
+        select_channel_id_sql = f"""
+            SELECT channel_content_id
+            FROM produce_plan_exe_record
+            WHERE plan_exe_id = '{new_content_id}';
         """
-        select_sql = f"""
-                    select kimi_status
-                    from {self.article_text_table}
-                    where content_id = '{content_id}';
-                    """
-        response = await self.mysql_client.async_select(select_sql)
-        if response:
-            kimi_status = response[0][0]
-            return kimi_status
+        channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
+
+        if channel_content_id:
+            select_source_content_id_sql = f"""
+                SELECT root_produce_content_id
+                FROM article_pool_promotion_source
+                WHERE channel_content_id = '{channel_content_id[0][0]}';
+            """
+            source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
+            if source_content_id:
+                return source_content_id[0][0]
+            else:
+                return
         else:
-            return NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR
+            return
 
     async def kimi_task(self, params):
         """
         执行 kimi 任务
         :return:
         """
-        content_id = params['content_id']
         trace_id = params['trace_id']
+        if params.get("root_content_id"):
+            kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
+            affected_rows = await self.update_content_status(
+                new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                trace_id=trace_id,
+                ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
+            )
+            if affected_rows == 0:
+                logging(
+                    code="6000",
+                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
+                )
+                return
+            return kimi_result
+
+        # 处理content_id
+        content_id = params['content_id']
         process_times = params['process_times']
-        kimi_status_code = await self.get_kimi_status(content_id=content_id)
+        kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
 
         if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
             affected_rows = await self.update_content_status(
@@ -307,23 +291,15 @@ class NewContentIdTask(object):
                     info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
                 )
                 return
-            get_kimi_sql = f"""
-            SELECT article_title, kimi_title, kimi_summary, kimi_keys
-            FROM {self.article_text_table}
-            WHERE content_id = '{content_id}';
-            """
-            kimi_info = await self.mysql_client.async_select(get_kimi_sql)
-            return {
-                "kimi_title": kimi_info[0][1],
-                "ori_title": kimi_info[0][0],
-                "kimi_summary": kimi_info[0][2],
-                "kimi_keys": json.loads(kimi_info[0][3])
-            }
+            kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
+            return kimi_result
+
         elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
             logging(
                 code="4000",
                 info="long_articles_text表中未找到 content_id"
             )
+
         else:
             # 开始处理,讲 content_status 从 0  改为  101
             affected_rows = await self.update_content_status(
@@ -337,47 +313,14 @@ class NewContentIdTask(object):
                     info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
                 )
                 return
-            K = KimiServer()
             try:
-                select_sql = f"""
-                select article_title, article_text
-                from {self.article_text_table}
-                where content_id = '{content_id}'
-                """
-                res = await self.mysql_client.async_select(select_sql)
-                article_obj = {
-                    "article_title": res[0][0],
-                    "article_text": res[0][1],
-                    "content_id": content_id
-                }
-                kimi_info = await K.search_kimi_schedule(params=article_obj)
-                kimi_title = kimi_info['k_title']
-                content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
-                content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
-                update_kimi_sql = f"""
-                        UPDATE {self.article_text_table} 
-                        SET
-                            kimi_title = %s,
-                            kimi_summary = %s,
-                            kimi_keys = %s,
-                            kimi_status = %s
-                        WHERE content_id = %s;"""
-                await self.mysql_client.async_insert(
-                    sql=update_kimi_sql,
-                    params=(
-                    kimi_title, content_title, content_keys, NewContentIdTaskConst.KIMI_SUCCESS_STATUS, params['content_id'])
-                )
+                kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
                 await self.update_content_status(
                     new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
                     trace_id=trace_id,
                     ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
                 )
-                return {
-                    "kimi_title": kimi_title,
-                    "ori_title": article_obj['article_title'],
-                    "kimi_summary": content_title,
-                    "kimi_keys": kimi_info['content_keys']
-                }
+                return kimi_result
             except Exception as e:
                 # kimi 任务处理失败
                 update_kimi_sql = f"""
@@ -386,7 +329,7 @@ class NewContentIdTask(object):
                             kimi_status = %s 
                         WHERE content_id = %s
                         """
-                await self.mysql_client.async_insert(
+                await self.long_articles_client.async_insert(
                     sql=update_kimi_sql,
                     params=(
                         NewContentIdTaskConst.KIMI_FAIL_STATUS,
@@ -409,21 +352,37 @@ class NewContentIdTask(object):
         content_id = params['content_id']
         process_times = params['process_times']
         gh_id = params['gh_id']
-        select_sql = f"""
-            SELECT count(id) 
-            FROM {self.article_crawler_video_table} 
-            WHERE content_id = '{content_id}' 
-            AND download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
-        """
-        count_tuple = await self.mysql_client.async_select(select_sql)
-        counts = count_tuple[0][0]
-        if counts >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
+        if params.get("root_content_id"):
+            # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
+            await update_crawler_table_with_exist_content_id(
+                content_id=content_id,
+                trace_id=trace_id,
+                article_crawler_video_table=self.article_crawler_video_table,
+                db_client=self.long_articles_client,
+                root_content_id=params['root_content_id']
+            )
+            affected_rows = await self.update_content_status(
+                new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
+                trace_id=trace_id,
+                ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
+            )
+            if affected_rows == 0:
+                return
+            return True
+
+        download_video_exist_flag = await whether_downloaded_videos_exists(
+            content_id=content_id,
+            article_crawler_video_table=self.article_crawler_video_table,
+            db_client=self.long_articles_client
+        )
+        if download_video_exist_flag:
             await self.update_content_status(
                 new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
                 ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
             )
             return True
+
         # 开始处理,将状态由 1 改成  101
         affected_rows = await self.update_content_status(
             new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
@@ -454,7 +413,7 @@ class NewContentIdTask(object):
                     "crawler_video_table": self.article_crawler_video_table
                 },
                 gh_id_map=self.account_map,
-                db_client=self.mysql_client
+                db_client=self.long_articles_client
             )
             if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
                 # 表示爬虫任务执行成功, 将状态从 101  改为 2
@@ -499,15 +458,13 @@ class NewContentIdTask(object):
         trace_id = params['trace_id']
         content_id = params['content_id']
         process_times = params['process_times']
-        # 判断是否有三条已经下载完成的视频
-        select_sql = f"""
-            select count(id) 
-            from {self.article_crawler_video_table} 
-            where content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
-        """
-        video_count_tuple = await self.mysql_client.async_select(select_sql)
-        video_count = video_count_tuple[0][0]
-        if video_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
+        # 判断视频是否已下载完成
+        video_exist_flag = await whether_downloaded_videos_exists(
+            content_id=content_id,
+            article_crawler_video_table=self.article_crawler_video_table,
+            db_client=self.long_articles_client
+        )
+        if video_exist_flag:
             affect_rows = await self.update_content_status(
                 ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
@@ -533,118 +490,16 @@ class NewContentIdTask(object):
                     info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
                 )
                 return False
-            select_sql = f"""
-                SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
-                FROM {self.article_crawler_video_table}
-                WHERE content_id = '{content_id}' and download_status != {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
-                ORDER BY score DESC;
-            """
-            videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
-            downloaded_count = 0
-            for line in videos_need_to_download_tuple:
-                params = {
-                    "id": line[0],
-                    "video_id": line[1],
-                    "platform": line[2],
-                    "video_title": line[3],
-                    "video_url": line[4],
-                    "cover_url": line[5],
-                    "user_id": line[6],
-                    "trace_id": line[7]
-                }
-                try:
-                    local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
-                    # download videos
-                    file_path = await download_video(
-                        file_path=local_video_path,
-                        platform=params['platform'],
-                        video_url=params['video_url']
-                    )
-                    if not file_path:
-                        # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
-                        update_sql = f"""
-                                    UPDATE {self.article_crawler_video_table}
-                                    SET download_status = %s
-                                    WHERE id = %s;
-                        """
-                        await self.mysql_client.async_insert(
-                            sql=update_sql,
-                            params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
-                        )
-                        logging(
-                            code="etl_1001",
-                            info="etl_下载视频失败",
-                            trace_id=trace_id,
-                            function="etl_task"
-                        )
-                    else:
-                        # download cover
-                        cover_path = await download_cover(
-                            file_path=local_cover_path,
-                            platform=params['platform'],
-                            cover_url=params['cover_url']
-                        )
-                        # upload video to oss
-                        oss_video = await upload_to_oss(
-                            local_video_path=file_path,
-                            download_type="video"
-                        )
-                        # upload cover to oss
-                        if cover_path:
-                            oss_cover = await upload_to_oss(
-                                local_video_path=cover_path,
-                                download_type="image"
-                            )
-                        else:
-                            oss_cover = None
-
-                        # change status to success
-                        update_sql = f"""
-                                        UPDATE {self.article_crawler_video_table}
-                                        SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
-                                        WHERE id = %s;
-                        """
-                        await self.mysql_client.async_insert(
-                            sql=update_sql,
-                            params=(
-                                oss_video,
-                                oss_cover,
-                                NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS,
-                                params['id']
-                            )
-                        )
-                        downloaded_count += 1
-                        logging(
-                            code="etl_1002",
-                            info="etl_视频下载成功",
-                            trace_id=trace_id,
-                            function="etl_task"
-                        )
-                    # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
-                    if downloaded_count > NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
-                        await self.update_content_status(
-                            ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
-                            trace_id=trace_id,
-                            new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
-                        )
-                        return True
-                except Exception as e:
-                    update_sql = f"""
-                    UPDATE {self.article_crawler_video_table}
-                    SET download_status = %s
-                    WHERE id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql,
-                        params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
-                    )
-                    logging(
-                        code="etl_1001",
-                        info="etl_下载视频失败",
-                        trace_id=trace_id,
-                        function="etl_task"
-                    )
-            if downloaded_count >= 3:
+
+            # download videos
+            downloaded_count = await async_download_videos(
+                trace_id=trace_id,
+                content_id=content_id,
+                article_crawler_video_table=self.article_crawler_video_table,
+                db_client=self.long_articles_client
+            )
+
+            if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
                 await self.update_content_status(
                     ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
                     trace_id=trace_id,
@@ -683,7 +538,12 @@ class NewContentIdTask(object):
             )
             return False
         try:
-            download_videos = await self.get_downloaded_videos(content_id)
+            download_videos = await get_downloaded_videos(
+                content_id=content_id,
+                article_crawler_video_table=self.article_crawler_video_table,
+                db_client=self.long_articles_client
+            )
+
             match flow_pool_level:
                 case "autoArticlePoolLevel4":
                     # 冷启层, 全量做
@@ -701,42 +561,17 @@ class NewContentIdTask(object):
                     video_list = download_videos[:3]
                 case _:
                     video_list = download_videos[:3]
-            L = []
-            for video_obj in video_list:
-                params = {
-                    "videoPath": video_obj['video_oss_path'],
-                    "uid": video_obj['uid'],
-                    "title": kimi_title
-                }
-                publish_response = await publish_to_pq(params)
-                video_id = publish_response['data']['id']
-                response = await get_pq_video_detail(video_id)
-                obj = {
-                    "uid": video_obj['uid'],
-                    "source": video_obj['platform'],
-                    "kimiTitle": kimi_title,
-                    "videoId": response['data'][0]['id'],
-                    "videoCover": response['data'][0]['shareImgPath'],
-                    "videoPath": response['data'][0]['videoPath'],
-                    "videoOss": video_obj['video_oss_path']
-                }
-                L.append(obj)
-            update_sql = f"""
-                    UPDATE {self.article_match_video_table}
-                    SET content_status = %s, response = %s, process_times = %s
-                    WHERE trace_id = %s and content_status = %s;
-                    """
-            # 从操作中状态修改为已发布状态
-            await self.mysql_client.async_insert(
-                sql=update_sql,
-                params=(
-                    NewContentIdTaskConst.TASK_PUBLISHED_STATUS,
-                    json.dumps(L, ensure_ascii=False),
-                    process_times + 1,
-                    trace_id,
-                    NewContentIdTaskConst.TASK_PROCESSING_STATUS
-                )
+
+            # 将视频发布至票圈
+            await publish_videos_to_piaoquan(
+                video_list=video_list,
+                kimi_title=kimi_title,
+                process_times=process_times,
+                trace_id=trace_id,
+                db_client=self.long_articles_client,
+                article_match_video_table=self.article_match_video_table
             )
+
         except Exception as e:
             await self.roll_back_content_status_when_fails(
                 process_times=params['process_times'] + 1,
@@ -750,15 +585,11 @@ class NewContentIdTask(object):
         :param params:
         :return:
         """
-        # step1: 执行 kimi 操作
-        # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
         kimi_result = await self.kimi_task(params)
         trace_id = params['trace_id']
         process_times = params['process_times']
         content_id = params['content_id']
-        gh_id = params['gh_id']
         publish_flag = params['publish_flag']
-        print(kimi_result)
         if kimi_result:
             # 等待 kimi 操作执行完成之后,开始执行 spider_task
             print("kimi success")
@@ -844,7 +675,7 @@ class NewContentIdTask(object):
                     SET content_status = %s
                     WHERE content_id = %s and content_status = %s;
                 """
-                affected_rows = await self.mysql_client.async_insert(
+                affected_rows = await self.long_articles_client.async_insert(
                     sql=update_sql,
                     params=(
                         NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
@@ -858,7 +689,7 @@ class NewContentIdTask(object):
                     FROM {self.article_text_table}
                     WHERE content_id = '{content_id}';
                 """
-                result = await self.mysql_client.async_select(select_sql)
+                result = await self.long_articles_client.async_select(select_sql)
                 bot(
                     title="KIMI 处理失败",
                     detail={
@@ -869,15 +700,20 @@ class NewContentIdTask(object):
                     mention=False
                 )
 
-    async def process_task(self, params):
+    async def process_each_task(self, params):
         """
         处理任务
         :return:
         """
         content_id = params['content_id']
-        download_videos = await self.get_video_list(content_id)
-        if not download_videos:
-            # 开始处理, 判断是否有相同的文章 id 正在处理
+        flow_pool_level = params['flow_pool_level']
+
+        download_videos_exists_flag = await whether_downloaded_videos_exists(
+            content_id=content_id,
+            article_crawler_video_table=self.article_crawler_video_table,
+            db_client=self.long_articles_client
+        )
+        if not download_videos_exists_flag:
             processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
             if processing_flag:
                 logging(
@@ -885,35 +721,46 @@ class NewContentIdTask(object):
                     info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
                 )
             else:
+                # 判断是否存在root_content_id
+                if flow_pool_level != 'autoArticlePoolLevel4':
+                    # 如果查到根content_id, 采用根content_id的视频
+                    root_content_id = await self.get_source_content_id(content_id)
+                    if root_content_id:
+                        # 传参新增root_content_id
+                        params['root_content_id'] = root_content_id[0][0]
+                # 开始处理
                 await self.start_process(params=params)
         else:
             print("存在已下载视频")
 
-    async def deal(self):
+    async def deal(self) -> None:
         """
         function
         :return:
         """
+        # 处理未托管的任务
+        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
+
+        # 处理托管任务
+        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
+
+        # 将处理次数大于3次且未成功的任务置为失败
+        await self.set_tasks_status_fail()
+
+        # 获取task_list
         task_list = await self.get_tasks()
-        task_dict = {}
-        # 对 content_id去重
-        for task in task_list:
-            key = task['content_id']
-            task_dict[key] = task
-        process_list = []
-        for item in task_dict:
-            process_list.append(task_dict[item])
+        task_dict = {task['content_id']: task for task in task_list}
+        process_list = list(task_dict.values())
         logging(
             code="5001",
             info="Match Task Got {} this time".format(len(process_list)),
             function="Publish Task"
         )
-        if task_list:
-            total_task = len(process_list)
-            print(process_list)
+
+        # 处理process_list
+        if process_list:
             a = time.time()
-            print("开始处理,一共{}个任务".format(total_task))
-            tasks = [self.process_task(params) for params in process_list]
+            tasks = [self.process_each_task(params) for params in process_list]
             await asyncio.gather(*tasks)
             b = time.time()
             print("处理时间: {} s".format(b - a))

+ 15 - 0
tasks/utils/__init__.py

@@ -0,0 +1,15 @@
+"""
+@author: luojunhui
+"""
+
+from .kimi_task import get_kimi_result
+from .kimi_task import get_kimi_status
+from .kimi_task import generate_kimi_result
+
+from .spider_task import get_downloaded_videos
+from .spider_task import update_crawler_table_with_exist_content_id
+from .spider_task import whether_downloaded_videos_exists
+
+from .publish_task import publish_videos_to_piaoquan
+
+from .etl_task import async_download_videos

+ 122 - 0
tasks/utils/etl_task.py

@@ -0,0 +1,122 @@
+"""
+@author: luojunhui
+"""
+from applications.etl_function import *
+from applications.const import new_content_id_task_const
+from applications.log import logging
+
+
+async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client):
+    """
+    下载视频
+    """
+    select_sql = f"""
+                    SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
+                    FROM {article_crawler_video_table}
+                    WHERE content_id = '{content_id}' 
+                        AND download_status != {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                        AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
+                    ORDER BY score DESC;
+                """
+    videos_need_to_download_tuple = await db_client.async_select(select_sql)
+    downloaded_count = 0
+    for line in videos_need_to_download_tuple:
+        params = {
+            "id": line[0],
+            "video_id": line[1],
+            "platform": line[2],
+            "video_title": line[3],
+            "video_url": line[4],
+            "cover_url": line[5],
+            "user_id": line[6],
+            "trace_id": line[7]
+        }
+        try:
+            local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
+            # download videos
+            file_path = await download_video(
+                file_path=local_video_path,
+                platform=params['platform'],
+                video_url=params['video_url']
+            )
+            if not file_path:
+                # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
+                update_sql = f"""
+                        UPDATE {article_crawler_video_table}
+                        SET download_status = %s
+                        WHERE id = %s;
+                """
+
+                await db_client.async_insert(
+                    sql=update_sql,
+                    params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
+                )
+                logging(
+                    code="etl_1001",
+                    info="etl_下载视频失败",
+                    trace_id=trace_id,
+                    function="etl_task"
+                )
+            else:
+                # download cover
+                cover_path = await download_cover(
+                    file_path=local_cover_path,
+                    platform=params['platform'],
+                    cover_url=params['cover_url']
+                )
+                # upload video to oss
+                oss_video = await upload_to_oss(
+                    local_video_path=file_path,
+                    download_type="video"
+                )
+                # upload cover to oss
+                if cover_path:
+                    oss_cover = await upload_to_oss(
+                        local_video_path=cover_path,
+                        download_type="image"
+                    )
+                else:
+                    oss_cover = None
+
+                # change status to success
+                update_sql = f"""
+                                            UPDATE {article_crawler_video_table}
+                                            SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                                            WHERE id = %s;
+                            """
+                await db_client.async_insert(
+                    sql=update_sql,
+                    params=(
+                        oss_video,
+                        oss_cover,
+                        new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
+                        params['id']
+                    )
+                )
+                downloaded_count += 1
+                logging(
+                    code="etl_1002",
+                    info="etl_视频下载成功",
+                    trace_id=trace_id,
+                    function="etl_task"
+                )
+            # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
+            if downloaded_count > new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
+                return downloaded_count
+        except Exception as e:
+            update_sql = f"""
+                        UPDATE {article_crawler_video_table}
+                        SET download_status = %s
+                        WHERE id = %s;
+                        """
+            await db_client.async_insert(
+                sql=update_sql,
+                params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
+            )
+            logging(
+                code="etl_1001",
+                info="etl_下载视频失败",
+                trace_id=trace_id,
+                function="etl_task"
+            )
+    return downloaded_count

+ 84 - 0
tasks/utils/kimi_task.py

@@ -0,0 +1,84 @@
+"""
+@author: luojunhui
+kimi 相关方法
+"""
+import json
+
+from typing import Dict
+
+from applications.const import new_content_id_task_const
+from applications.functions.kimi import KimiServer
+
+
+async def get_kimi_status(content_id, article_text_table, db_client) -> int:
+    """
+    通过 content_id 获取kimi info
+    :return:
+    """
+    select_sql = f"""
+                SELECT kimi_status
+                FROM {article_text_table}
+                WHERE content_id = '{content_id}';
+                """
+    response = await db_client.async_select(select_sql)
+    if response:
+        kimi_status = response[0][0]
+        return kimi_status
+    else:
+        return new_content_id_task_const.ARTICLE_TEXT_TABLE_ERROR
+
+
+async def get_kimi_result(content_id, article_text_table, db_client) -> Dict:
+    """
+    获取kimi的返回结果
+    """
+    get_kimi_sql = f"""
+                SELECT article_title, kimi_title, kimi_summary, kimi_keys
+                FROM {article_text_table}
+                WHERE content_id = '{content_id}';
+                """
+    kimi_info = await db_client.async_select(get_kimi_sql)
+    return {
+        "kimi_title": kimi_info[0][1],
+        "ori_title": kimi_info[0][0],
+        "kimi_summary": kimi_info[0][2],
+        "kimi_keys": json.loads(kimi_info[0][3])
+    }
+
+
+async def generate_kimi_result(content_id, article_text_table, db_client) -> Dict:
+    """
+    为content_id执行kimi操作
+    """
+    K = KimiServer()
+    select_sql = f"""
+                SELECT article_title, article_text
+                FROM {article_text_table}
+                WHERE content_id = '{content_id}';
+                """
+    res = await db_client.async_select(select_sql)
+    article_obj = {
+        "article_title": res[0][0],
+        "article_text": res[0][1],
+        "content_id": content_id
+    }
+    kimi_info = await K.search_kimi_schedule(params=article_obj)
+    kimi_title = kimi_info['k_title']
+    content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
+    content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
+    update_kimi_sql = f"""
+                        UPDATE {article_text_table} 
+                        SET
+                            kimi_title = %s, kimi_summary = %s, kimi_keys = %s, kimi_status = %s
+                        WHERE content_id = %s;"""
+    await db_client.async_insert(
+        sql=update_kimi_sql,
+        params=(
+            kimi_title, content_title, content_keys, new_content_id_task_const.KIMI_SUCCESS_STATUS, content_id)
+    )
+    return {
+        "kimi_title": kimi_title,
+        "ori_title": article_obj['article_title'],
+        "kimi_summary": content_title,
+        "kimi_keys": kimi_info['content_keys']
+    }

+ 52 - 0
tasks/utils/publish_task.py

@@ -0,0 +1,52 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications.const import new_content_id_task_const
+from applications.functions.pqFunctions import get_pq_video_detail
+from applications.functions.pqFunctions import publish_to_pq
+
+
+async def publish_videos_to_piaoquan(video_list, kimi_title, process_times, trace_id, article_match_video_table, db_client):
+    """
+    将视频发布至票圈
+    """
+    L = []
+    for video_obj in video_list:
+        params = {
+            "videoPath": video_obj['video_oss_path'],
+            "uid": video_obj['uid'],
+            "title": kimi_title
+        }
+        publish_response = await publish_to_pq(params)
+        video_id = publish_response['data']['id']
+        response = await get_pq_video_detail(video_id)
+        obj = {
+            "uid": video_obj['uid'],
+            "source": video_obj['platform'],
+            "kimiTitle": kimi_title,
+            "videoId": response['data'][0]['id'],
+            "videoCover": response['data'][0]['shareImgPath'],
+            "videoPath": response['data'][0]['videoPath'],
+            "videoOss": video_obj['video_oss_path']
+        }
+        L.append(obj)
+
+    update_sql = f"""
+        UPDATE {article_match_video_table}
+        SET content_status = %s, response = %s, process_times = %s
+        WHERE trace_id = %s and content_status = %s;
+    """
+
+    # 从操作中状态修改为已发布状态
+    await db_client.async_insert(
+        sql=update_sql,
+        params=(
+            new_content_id_task_const.TASK_PUBLISHED_STATUS,
+            json.dumps(L, ensure_ascii=False),
+            process_times + 1,
+            trace_id,
+            new_content_id_task_const.TASK_PROCESSING_STATUS
+        )
+    )

+ 82 - 0
tasks/utils/spider_task.py

@@ -0,0 +1,82 @@
+"""
+@author: luojunhui
+"""
+from typing import Dict, List
+
+from applications.const import new_content_id_task_const
+
+
+async def whether_downloaded_videos_exists(content_id, article_crawler_video_table, db_client):
+    """
+    判断该文章是否存在历史匹配视频
+    :param content_id:
+    :param article_crawler_video_table: 爬虫表
+    :param db_client
+    :return:
+    """
+    sql = f"""
+    SELECT id
+    FROM {article_crawler_video_table}
+    WHERE content_id = '{content_id}' 
+        AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+        AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
+    """
+    res_tuple = await db_client.async_select(sql)
+    if len(res_tuple) >= new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
+        return True
+    else:
+        return False
+
+
+async def get_downloaded_videos(content_id, article_crawler_video_table, db_client) -> List[Dict]:
+    """
+    获取已下载的视频
+    :return:
+    """
+    sql = f"""
+        SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
+        FROM {article_crawler_video_table}
+        WHERE content_id = '{content_id}' 
+            AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+            AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
+        ORDER BY score DESC;
+    """
+
+    res_tuple = await db_client.async_select(sql)
+    return [
+        {
+            "platform": i[0],
+            "play_count": i[1],
+            "like_count": i[2],
+            "video_oss_path": i[3],
+            "cover_oss_path": i[4],
+            "uid": i[5]
+        }
+        for i in res_tuple
+    ]
+
+
+async def update_crawler_table_with_exist_content_id(root_content_id, content_id, trace_id, article_crawler_video_table, db_client):
+    """
+    用root_content_id 查询出已经下载过的视频信息,用new_content_id更新
+    """
+    select_sql = f"""
+        SELECT *
+        FROM {article_crawler_video_table}
+        WHERE content_id = '{root_content_id}' 
+            AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+            AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
+    """
+    res_tuple = await db_client.async_select(select_sql)
+    insert_list = [(content_id,) + row[2:-3] + (trace_id,) + row[-2:] for row in res_tuple]
+    insert_sql = f"""
+        INSERT INTO {article_crawler_video_table}
+        (content_id, out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time, duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, trace_id, score, is_illegal)
+        VALUES 
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+    """
+    await db_client.async_insert_many(sql=insert_sql, params_list=insert_list)
+
+
+
+