Browse Source

删除一些无用代码文件

luojunhui 5 months ago
parent
commit
ca983238ac
11 changed files with 1 additions and 1001 deletions
  1. 0 27
      etlTask.py
  2. 0 24
      kimiTask.py
  3. 1 1
      newContentIdTask.py
  4. 0 27
      publishtask.py
  5. 0 29
      spiderTask.py
  6. 0 4
      tasks/__init__.py
  7. 0 303
      tasks/etl_task.py
  8. 0 102
      tasks/kimi_task.py
  9. 0 0
      tasks/new_contentId_task.py
  10. 0 196
      tasks/publish_task.py
  11. 0 288
      tasks/spider_task.py

+ 0 - 27
etlTask.py

@@ -1,27 +0,0 @@
-"""
-@author: luojunhui
-"""
-import time
-import asyncio
-import datetime
-from tasks.etl_task import AsyncETL
-from applications.db import TaskMySQLClient
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = AsyncETL(TMC)
-    await PD.deal()
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待30s".format(now_str))
-        time.sleep(30)

+ 0 - 24
kimiTask.py

@@ -1,24 +0,0 @@
-import time
-import asyncio
-import datetime
-from tasks.kimi_task import KimiTask
-from applications.db import TaskMySQLClient
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = KimiTask(TMC)
-    await PD.deal()
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)

+ 1 - 1
newContentIdTask.py

@@ -5,7 +5,7 @@ import time
 import datetime
 import asyncio
 from applications.db import AsyncMySQLClient
-from tasks.newContentIdTask import NewContentIdTask
+from tasks.new_contentId_task import NewContentIdTask
 
 
 async def main_job():

+ 0 - 27
publishtask.py

@@ -1,27 +0,0 @@
-"""
-@author: luojunhui
-"""
-import time
-import asyncio
-import datetime
-from tasks.publish_task import publishTask
-from applications.db import TaskMySQLClient
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = publishTask(TMC)
-    await PD.deal()
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)

+ 0 - 29
spiderTask.py

@@ -1,29 +0,0 @@
-"""
-@author: luojunhui
-"""
-import time
-import datetime
-import asyncio
-
-from tasks.spider_task import spiderTask
-from applications.db import TaskMySQLClient
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = spiderTask(TMC)
-    await PD.deal()
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)
-

+ 0 - 4
tasks/__init__.py

@@ -2,8 +2,4 @@
 @author: luojunhui
 定时任务
 """
-from .etl_task import AsyncETL
-from .kimi_task import KimiTask
-from .spider_task import spiderTask
-from .publish_task import publishTask
 from .history_task import historyContentIdTask

+ 0 - 303
tasks/etl_task.py

@@ -1,303 +0,0 @@
-"""
-@author: luojunhui
-"""
-import os
-
-import oss2
-import aiohttp
-import aiofiles
-import asyncio
-import requests
-
-from datetime import datetime
-from hashlib import md5
-from uuid import uuid4
-
-from fake_useragent import FakeUserAgent
-from applications.config import Config
-from applications.log import logging
-
-
-async def download_cover(file_path, platform, cover_url):
-    """
-    下载视频封面
-    :param platform:
-    :param cover_url:
-    :param file_path:
-    :return:
-    """
-    headers = request_header(platform=platform, url=cover_url, download_type="cover")
-    response = requests.get(url=cover_url, headers=headers)
-    if b"<html>" in response.content:
-        return None
-    elif response.status_code != 200:
-        return None
-    else:
-        with open(file_path, "wb") as f:
-            f.write(response.content)
-        return file_path
-
-
-def request_header(platform, url, download_type="video"):
-    """
-    请求头
-    :return:
-    """
-    if platform == "xg_search":
-        if "v9-xg-web-pc.ixigua.com" in url:
-            headers = {
-                "Accept": "*/*",
-                "Accept-Language": "zh-CN,zh;q=0.9",
-                "Host": "v9-xg-web-pc.ixigua.com",
-                "User-Agent": FakeUserAgent().chrome,
-                "Origin": "https://www.ixigua.com/",
-                "Referer": "https://www.ixigua.com/"
-            }
-        elif "v3-xg-web-pc.ixigua.com" in url:
-            headers = {
-                "Accept": "*/*",
-                "Accept-Language": "zh-CN,zh;q=0.9",
-                "Host": "v3-xg-web-pc.ixigua.com",
-                "User-Agent": FakeUserAgent().chrome,
-                "Origin": "https://www.ixigua.com/",
-                "Referer": "https://www.ixigua.com/"
-            }
-        elif download_type == "cover":
-            headers = {
-                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
-                'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
-                'Cache-Control': 'max-age=0',
-                'Proxy-Connection': 'keep-alive',
-                'Upgrade-Insecure-Requests': '1',
-                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
-            }
-        else:
-            headers = {
-                "Accept": "*/*",
-                "Accept-Language": "zh-CN,zh;q=0.9",
-                "Host": "v3-xg-web-pc.ixigua.com",
-                "User-Agent": FakeUserAgent().chrome,
-                "Origin": "https://www.ixigua.com/",
-                "Referer": "https://www.ixigua.com/"
-            }
-    elif platform == "baidu_search":
-        headers = {
-            "Accept": "*/*",
-            "Accept-Language": "zh-CN,zh;q=0.9",
-            "User-Agent": FakeUserAgent().chrome,
-        }
-    elif platform == "wx_search":
-        headers = {
-            "Accept": "*/*",
-            "Accept-Language": "zh-CN,zh;q=0.9",
-            "User-Agent": FakeUserAgent().chrome,
-            "Origin": "https://mp.weixin.qq.com",
-            "Referer": "https://mp.weixin.qq.com"
-        }
-    elif platform == "dy_search":
-        headers = {
-            'accept': '*/*',
-            'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
-            'priority': 'i',
-            'range': 'bytes=0-',
-            'referer': 'https://v11-coldf.douyinvod.com/',
-            'user-agent': FakeUserAgent().chrome
-        }
-    else:
-        headers = {}
-    return headers
-
-
-async def download_video(file_path, platform, video_url, download_type="video"):
-    """
-    :param download_type:
-    :param video_url:
-    :param platform:
-    :param file_path:
-    :return:
-    """
-    headers = request_header(platform=platform, url=video_url, download_type=download_type)
-    if os.path.exists(file_path):
-        file_size = os.path.getsize(file_path)
-        headers["Range"] = f"bytes={file_size}-"
-    else:
-        file_size = 0
-    async with aiohttp.ClientSession() as session:
-        async with session.get(video_url, headers=headers) as response:
-            if response.status in [200, 206]:
-                if file_size > 0:
-                    async with aiofiles.open(file_path, "ab+") as f:
-                        # 以1MB为单位分块下载
-                        async for chunk in response.content.iter_chunked(1024 * 1024):
-                            await f.write(chunk)
-                else:
-                    async with aiofiles.open(file_path, "wb") as f:
-                        # 以1MB为单位分块下载
-                        async for chunk in response.content.iter_chunked(1024 * 1024):
-                            await f.write(chunk)
-
-            else:
-                print(response.status)
-    return file_path
-
-
-def generate_video_path(platform, video_id):
-    """
-    通过视频信息生成唯一视频地址
-    :return:
-    """
-    index = "{}-{}-{}".format(platform, video_id, uuid4())
-    index = md5(index.encode()).hexdigest()
-    file_name = "{}.mp4".format(index)
-    cover_name = "{}.png".format(index)
-    file_path = os.path.join(os.getcwd(), "static", file_name)
-    cover_path = os.path.join(os.getcwd(), "static", cover_name)
-    return file_path, cover_path
-
-
-async def upload_to_oss(local_video_path, download_type):
-    """
-    把视频上传到 oss
-    :return:
-    """
-    oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
-    access_key_id = "LTAIP6x1l3DXfSxm"
-    access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
-    endpoint = "oss-cn-hangzhou.aliyuncs.com"
-    bucket_name = "art-pubbucket"
-    bucket = oss2.Bucket(
-        oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
-    )
-    bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
-    return oss_video_key
-
-
-class AsyncETL(object):
-    """
-    视频下载功能
-    """
-
-    def __init__(self, mysql_client):
-        # self.proxy = {
-        #     "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
-        #     "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
-        # }
-        self.max_retry = 5
-        self.mysql_client = mysql_client
-        self.config = Config()
-        self.article_crawler_video_table = self.config.article_crawler_video_table
-        self.article_match_video_table = self.config.article_match_video_table
-
-    async def get_tasks(self):
-        """
-        获取视频 id
-        :return:
-        """
-        select_sql = f"""
-        SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
-        FROM {self.article_crawler_video_table}
-        WHERE download_status = 0
-        ORDER BY id
-        LIMIT 10;
-        """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            tasks = [
-                {
-                    "id": line[0],
-                    "video_id": line[1],
-                    "platform": line[2],
-                    "video_title": line[3],
-                    "video_url": line[4],
-                    "cover_url": line[5],
-                    "user_id": line[6],
-                    "trace_id": line[7]
-                }
-                for line in result
-            ]
-            return tasks
-        else:
-            return []
-
-    async def process_task(self, params):
-        """
-        处理 task
-        :return:
-        """
-        downloading_status = 1
-        downloaded_status = 2
-        download_failed_status = 3
-        update_sql_0 = f"""
-                    UPDATE {self.article_crawler_video_table}
-                    SET download_status = %s
-                    WHERE id = %s;
-                    """
-        await self.mysql_client.async_insert(
-            sql=update_sql_0,
-            params=(downloading_status, params['id'])
-        )
-        try:
-            local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
-            # download videos
-            file_path = await download_video(
-                file_path=local_video_path,
-                platform=params['platform'],
-                video_url=params['video_url']
-            )
-            # download cover
-            cover_path = await download_cover(
-                file_path=local_cover_path,
-                platform=params['platform'],
-                cover_url=params['cover_url']
-            )
-            oss_video = await upload_to_oss(
-                local_video_path=file_path,
-                download_type="video"
-            )
-            if cover_path:
-                oss_cover = await upload_to_oss(
-                    local_video_path=cover_path,
-                    download_type="image"
-                )
-            else:
-                oss_cover = None
-            update_sql = f"""
-                            UPDATE {self.article_crawler_video_table}
-                            SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
-                            WHERE id = %s;
-            """
-            await self.mysql_client.async_insert(
-                sql=update_sql,
-                params=(
-                    oss_video,
-                    oss_cover,
-                    downloaded_status,
-                    params['id']
-                )
-            )
-        except Exception as e:
-            update_sql = f"""
-            UPDATE {self.article_crawler_video_table}
-            SET download_status = %s
-            WHERE id = %s;
-            """
-            await self.mysql_client.async_insert(
-                sql=update_sql,
-                params=(download_failed_status, params['id'])
-            )
-            print("抓取 failed--{}".format(e))
-
-    async def deal(self):
-        """
-        ETL Deal Task
-        :return:
-        """
-        task_list = await self.get_tasks()
-        logging(
-            code="5001",
-            info="ETL Task Got {} this time".format(len(task_list)),
-            function="ETL"
-        )
-        if task_list:
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)

+ 0 - 102
tasks/kimi_task.py

@@ -1,102 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import asyncio
-from applications.functions.kimi import KimiServer
-from applications.log import logging
-from applications.config import Config
-
-
-class KimiTask(object):
-    """
-    KIMI task
-    """
-
-    def __init__(self, mysql_client):
-        """
-
-        :param mysql_client:
-        """
-        self.mysql_client = mysql_client
-        self.article_match_video_table = Config().article_match_video_table
-        self.article_text_table = Config().article_text_table
-
-    async def get_tasks(self):
-        """
-        获取 tasks
-        :return:
-        """
-        sql = f"""
-        SELECT content_id, article_title, article_text
-        FROM {self.article_text_table}
-        WHERE kimi_status = 0 
-        limit 5;
-        """
-        content_list = await self.mysql_client.async_select(sql)
-        if content_list:
-            task_list = [
-                {
-                    "contentId": i[0],
-                    "articleTitle": i[1],
-                    "articleText": i[2]
-                } for i in content_list
-            ]
-            return task_list
-        else:
-            return []
-
-    async def process_task(self, params):
-        """
-        do something
-        :return:
-        """
-        kimi_success_status = 1
-        kimi_fail_status = 2
-        K = KimiServer()
-        try:
-            kimi_info = await K.search_kimi_schedule(params=params)
-            kimi_title = kimi_info['k_title']
-            content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
-            content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
-            update_kimi_sql = f"""
-            UPDATE {self.article_text_table} 
-            SET
-                kimi_title = %s,
-                kimi_summary = %s,
-                kimi_keys = %s,
-                kimi_status = %s
-            WHERE content_id = %s;"""
-            await self.mysql_client.async_insert(
-                sql=update_kimi_sql,
-                params=(kimi_title, content_title, content_keys, kimi_success_status, params['contentId'])
-            )
-        except Exception as e:
-            update_kimi_sql = f"""
-            UPDATE {self.article_match_video_table}
-            SET
-                kimi_status = %s 
-            WHERE content_id = %s
-            """
-            await self.mysql_client.async_insert(
-                sql=update_kimi_sql,
-                params=(kimi_fail_status, params['contentId'])
-            )
-            print("kimi error--{}".format(e))
-
-    async def deal(self):
-        """
-        deal function
-        :return:
-        """
-        task_list = await self.get_tasks()
-        logging(
-            code="5001",
-            info="KIMI Task Got {} this time".format(len(task_list)),
-            function="Kimi Task"
-        )
-        if task_list:
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)
-        else:
-            print("没有要处理的 kimi 文章")

+ 0 - 0
tasks/newContentIdTask.py → tasks/new_contentId_task.py


+ 0 - 196
tasks/publish_task.py

@@ -1,196 +0,0 @@
-"""
-@author: luojunhui
-发布到 pq 获取视频 id
-"""
-import asyncio
-import json
-import time
-
-from applications.config import Config
-from applications.log import logging
-from applications.functions.pqFunctions import publish_to_pq
-from applications.functions.common import shuffle_list
-
-
-class publishTask(object):
-    """
-    在 match_videos 表中, 获取 content_status = 1 的 content_id
-    用  content_id 在 crawler_videos 表, 查询 download_status为 2 的视频,表示该 content_id 已经匹配完的视频
-    通过 流量池tag  逻辑
-    把 crawler_videos 中的视频路径发布至 pq, 获得 videoId
-    match_videos表将 content_status 修改为 2,response中记录 videoId && ossName等信息
-    """
-
-    def __init__(self, mysql_client):
-        self.mysql_client = mysql_client
-        self.config = Config()
-        self.article_match_video_table = self.config.article_match_video_table
-        self.article_text_table = self.config.article_text_table
-        self.article_crawler_video_table = self.config.article_crawler_video_table
-        self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
-
-    async def get_tasks(self):
-        """
-        获取 task
-        :return:
-        """
-        select_sql = f"""
-        SELECT trace_id, content_id, flow_pool_level, gh_id
-        FROM {self.article_match_video_table} 
-        WHERE content_status = 3
-        limit 10;
-        """
-        tasks = await self.mysql_client.async_select(select_sql)
-        if tasks:
-            return [
-                {
-                    "trace_id": i[0],
-                    "content_id": i[1],
-                    "flow_pool_level": i[2],
-                    "gh_id": i[3]
-                }
-                for i in tasks
-            ]
-        else:
-            return []
-
-    async def get_video_list(self, content_id):
-        """
-        content_id
-        :return:
-        """
-        sql = f"""
-        SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
-        FROM {self.article_crawler_video_table}
-        WHERE content_id = '{content_id}' and download_status = 2;
-        """
-        res_tuple = await self.mysql_client.async_select(sql)
-        if len(res_tuple) >= 3:
-            return [
-                {
-                    "platform": i[0],
-                    "play_count": i[1],
-                    "like_count": i[2],
-                    "video_oss_path": i[3],
-                    "cover_oss_path": i[4],
-                    "uid": i[5]
-                }
-                for i in res_tuple]
-        else:
-            return []
-
-    async def get_kimi_title(self, content_id):
-        """
-        获取 kimiTitle
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-        select kimi_title from {self.article_text_table} where content_id = '{content_id}';
-        """
-        res_tuple = await self.mysql_client.async_select(select_sql)
-        if res_tuple:
-            return res_tuple[0][0]
-        else:
-            return False
-
-    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
-        """
-        发布至 pq
-        :param trace_id:
-        :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
-        :param gh_id: 公众号 id ---> str
-        :param kimi_title: kimi 标题 ---> str
-        :param flow_pool_level: 流量池层级 ---> str
-        :return:
-        """
-        publish_success_status = 4
-        match flow_pool_level:
-            case "autoArticlePoolLevel4":
-                # 冷启层, 全量做
-                video_list = shuffle_list(download_videos)[:3]
-            case "autoArticlePoolLevel3":
-                if self.gh_id_dict.get(gh_id):
-                    video_list = shuffle_list(download_videos)[:3]
-                else:
-                    video_list = download_videos[:3]
-            case "autoArticlePoolLevel2":
-                # 次条,只针对具体账号做
-                video_list = []
-            case "autoArticlePoolLevel1":
-                # 头条,先不做
-                video_list = download_videos[:3]
-            case _:
-                print("未传流量池信息")
-                video_list = download_videos[:3]
-
-        L = []
-        for video_obj in video_list:
-            params = {
-                "videoPath": video_obj['video_oss_path'],
-                "uid": video_obj['uid'],
-                "title": kimi_title
-            }
-            response = await publish_to_pq(params)
-            time.sleep(2)
-            obj = {
-                "uid": video_obj['uid'],
-                "source": video_obj['platform'],
-                "kimiTitle": kimi_title,
-                "videoId": response['data']['id'],
-                "videoCover": response['data']['shareImgPath'],
-                "videoPath": response['data']['videoPath'],
-                "videoOss": video_obj['video_oss_path']
-            }
-            L.append(obj)
-        update_sql = f"""
-        UPDATE {self.article_match_video_table}
-        SET content_status = %s, response = %s
-        WHERE trace_id = %s;
-        """
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(publish_success_status, json.dumps(L, ensure_ascii=False), trace_id)
-        )
-
-    async def process_task(self, params):
-        """
-        处理任务
-        :return:
-        """
-        gh_id = params['gh_id']
-        flow_pool_level = params['flow_pool_level']
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        download_videos = await self.get_video_list(content_id)
-        if download_videos:
-            kimi_title = await self.get_kimi_title(content_id)
-            await self.publish_videos_to_pq(
-                flow_pool_level=flow_pool_level,
-                kimi_title=kimi_title,
-                gh_id=gh_id,
-                trace_id=trace_id,
-                download_videos=download_videos
-            )
-        else:
-            print("该 content_id还未下载完成")
-
-    async def deal(self):
-        """
-        function
-        :return:
-        """
-        task_list = await self.get_tasks()
-        logging(
-            code="5004",
-            info="PublishTask Got {} this time".format(len(task_list)),
-            function="Publish Task"
-        )
-        if task_list:
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)
-        else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )

+ 0 - 288
tasks/spider_task.py

@@ -1,288 +0,0 @@
-"""
-@author: luojunhui
-"""
-import asyncio
-import json
-import time
-
-from applications.config import Config
-from applications.log import logging
-from applications.spider import search_videos_from_web
-
-
-class spiderTask(object):
-    """
-    定时执行任务
-    """
-
-    def __init__(self, mysql_client):
-        """
-        :param mysql_client:
-        """
-        self.mysql_client = mysql_client
-        self.config = Config()
-        self.article_match_video_table = self.config.article_match_video_table
-        self.article_text_table = self.config.article_text_table
-        self.article_crawler_video_table = self.config.article_crawler_video_table
-        self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
-        self.gh_id_map = json.loads(self.config.get_config_value("accountMap"))
-
-    async def get_task(self):
-        """
-        获取任务, 查询出 article_match_video_table 中 已经 kimi  执行完成的  content_id
-        :return:
-        """
-        select_sql = f"""
-            SELECT 
-                amvt.trace_id, 
-                amvt.content_id, 
-                amvt.gh_id, 
-                amvt.process_times
-            FROM {self.article_match_video_table} amvt
-            JOIN (
-                select content_id
-                from {self.article_text_table}
-                where kimi_status != 0
-            ) att on amvt.content_id = att.content_id
-            WHERE content_status = 0 and process_times <= 3
-            GROUP BY content_id
-            LIMIT {self.spider_coroutines};
-        """
-        content_id_tuple = await self.mysql_client.async_select(select_sql)
-        if content_id_tuple:
-            content_id_list = [i for i in list(content_id_tuple)]
-            task_obj_list = [
-                {
-                    "trace_id": item[0],
-                    "content_id": item[1],
-                    "gh_id": item[2],
-                    "process_times": item[3]
-                } for item in content_id_list
-            ]
-            logging(
-                code="9001",
-                info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
-                data=task_obj_list
-            )
-            return task_obj_list
-        else:
-            return []
-
-    async def get_history_videos(self, content_id):
-        """
-        check whether the contents videos exists
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-            SELECT count(1)
-            FROM {self.article_crawler_video_table}
-            where content_id = '{content_id}' and download_status = 2;
-        """
-        content_videos = await self.mysql_client.async_select(select_sql)
-        videos_count = content_videos[0][0]
-        if videos_count >= 3:
-            return True
-        else:
-            return False
-
-    async def judge_content_processing(self, content_id):
-        """
-        判断该 content_id 是否在处理中
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-                       SELECT trace_id, content_status
-                       FROM {self.article_match_video_table}
-                       WHERE content_id = '{content_id}'
-                       ORDER BY id DESC;
-                   """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status != 0:
-                    return False
-            return True
-        else:
-            return True
-
-    async def get_kimi_result(self, content_id):
-        """
-        通过 content_id 获取kimi info
-        :return:
-        """
-        select_sql = f"""
-        select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status
-        from {self.article_text_table}
-        where content_id = '{content_id}';
-        """
-        response = await self.mysql_client.async_select(select_sql)
-        if response:
-            article_detail = response[0]
-            if article_detail[4] == 1:
-                result = {
-                    "ori_title": article_detail[0],
-                    "kimi_title": article_detail[1],
-                    "kimi_summary": article_detail[2],
-                    "kimi_keys": json.loads(article_detail[3]),
-                    "kimi_status": article_detail[4]
-                }
-            else:
-                result = {
-                    "kimiStatus": article_detail[4]
-                }
-            return result
-        else:
-            return
-
-    async def start_process(self, params):
-        """
-        开始处理
-        :param params:
-        :return:
-        """
-        defeat_status = 99
-        finish_kimi_status = 1
-        finish_spider_status = 2
-        kimi_result = await self.get_kimi_result(content_id=params['content_id'])
-        kimi_status = kimi_result['kimi_status']
-        match kimi_status:
-            case 1:
-                update_process_times_sql = f"""
-                            UPDATE {self.article_match_video_table}
-                            SET process_times = %s, content_status = %s, content_status_update_time = %s
-                            WHERE trace_id = %s;
-                            """
-                await self.mysql_client.async_insert(
-                    sql=update_process_times_sql,
-                    params=(
-                        params['process_times'] + 1,
-                        finish_kimi_status,
-                        int(time.time()),
-                        params['trace_id']
-                    )
-                )
-                try:
-                    search_videos_count = await search_videos_from_web(
-                        info={
-                            "ori_title": kimi_result['ori_title'],
-                            "kimi_summary": kimi_result['kimi_summary'],
-                            "kimi_keys": kimi_result['kimi_keys'],
-                            "trace_id": params['trace_id'],
-                            "gh_id": params['gh_id'],
-                            "content_id": params['content_id'],
-                            "crawler_video_table": self.article_crawler_video_table
-                        },
-                        gh_id_map=self.gh_id_map,
-                        db_client=self.mysql_client
-                    )
-                    if search_videos_count > 3:
-                        update_process_times_sql = f"""
-                                                    UPDATE {self.article_match_video_table}
-                                                    SET process_times = %s, content_status = %s, content_status_update_time = %s
-                                                    WHERE trace_id = %s;
-                                                    """
-                        await self.mysql_client.async_insert(
-                            sql=update_process_times_sql,
-                            params=(
-                                params['process_times'] + 1,
-                                finish_spider_status,
-                                int(time.time()),
-                                params['trace_id']
-                            )
-                        )
-                    else:
-                        roll_back_status = f"""
-                                                UPDATE {self.article_match_video_table}
-                                                SET process_times = %s, content_status_update_time = %s
-                                                WHERE trace_id = %s;
-                                            """
-                        await self.mysql_client.async_insert(
-                            sql=roll_back_status,
-                            params=(
-                                params['process_times'] + 1,
-                                int(time.time()),
-                                params['trace_id']
-                            )
-                        )
-                except Exception as e:
-                    roll_back_status = f"""
-                                    UPDATE {self.article_match_video_table}
-                                    SET process_times = %s, content_status_update_time = %s
-                                    WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=roll_back_status,
-                        params=(
-                            params['process_times'] + 1,
-                            int(time.time()),
-                            params['trace_id']
-                        )
-                    )
-                    print("爬虫处理失败: {}".format(e))
-            case 2:
-                update_process_times_sql = f"""
-                            UPDATE {self.article_match_video_table}
-                            SET process_times = %s, content_status = %s, content_status_update_time = %s
-                            WHERE trace_id = %s;
-                            """
-                await self.mysql_client.async_insert(
-                    sql=update_process_times_sql,
-                    params=(
-                        params['process_times'] + 1,
-                        defeat_status,
-                        int(time.time()),
-                        params['trace_id'],
-                    )
-                )
-
-    async def process_task(self, params):
-        """
-        异步执行
-        :param params:
-        :return:
-        """
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        video_id_list = await self.get_history_videos(content_id=content_id)
-        if video_id_list:
-            # 说明已经存在了结果, 将该条记录下的video_id拿出来
-            logging(
-                code="9001",
-                info="存在历史文章",
-                trace_id=trace_id
-            )
-        else:
-            flag = await self.judge_content_processing(content_id)
-            if flag:
-                logging(
-                    code="9004",
-                    info="无正在处理的文章ID, 开始处理",
-                    trace_id=trace_id
-                )
-                await self.start_process(params=params)
-            else:
-                logging(
-                    code="9003",
-                    info="该文章ID正在请求--文章ID {}".format(content_id),
-                    trace_id=trace_id
-                )
-
-    async def deal(self):
-        """
-        处理
-        :return:
-        """
-        task_list = await self.get_task()
-        logging(
-            code="5005",
-            info="Spider Task Got {} this time".format(len(task_list)),
-            function="Spider Task"
-        )
-        if task_list:
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)
-        else:
-            print("没有新的爬虫请求")