Переглянути джерело

新增代码
newContentIdTask.py

罗俊辉 7 місяців тому
батько
коміт
4cddd44862

+ 2 - 2
app.py

@@ -21,7 +21,7 @@ async def init_db():
     初始化
     :return:
     """
-    await AsyncMySQL.initPool()
+    await AsyncMySQL.init_pool()
 
 
 @app.after_serving
@@ -30,7 +30,7 @@ async def close_db():
     关闭连接
     :return:
     """
-    await AsyncMySQL.closePool()
+    await AsyncMySQL.close_pool()
 
 
 if __name__ == '__main__':

+ 21 - 18
applications/config/__init__.py

@@ -15,41 +15,44 @@ class Config(object):
         """
         match env:
             case "prod":
-                self.apolloConnection = pyapollos.ApolloClient(
+                self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",
                     config_server_url="https://apolloconfig-internal.piaoquantv.com/",
                     timeout=10
                 )
-                self.articleVideos = "long_articles_match_videos"
-                self.articleText = "long_articles_text"
-                self.articleCrawlerVideos = "long_articles_crawler_videos"
-                self.rootSourceIdTable = "long_articles_root_source_id"
+                self.article_match_video_table = "long_articles_match_videos"
+                self.article_text_table = "long_articles_text"
+                self.article_crawler_video_table = "long_articles_crawler_videos"
+                self.root_source_id_table = "long_articles_root_source_id"
+                self.get_off_video_table = "get_off_video"
             case "dev":
-                self.apolloConnection = pyapollos.ApolloClient(
+                self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",
                     config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
                     timeout=10
                 )
-                self.articleVideos = "long_articles_match_videos_dev"
-                self.articleText = "long_articles_text_dev"
-                self.articleCrawlerVideos = "long_articles_crawler_videos_dev"
-                self.rootSourceIdTable = "long_articles_root_source_id_dev"
+                self.article_match_video_table = "long_articles_match_videos_copy1"
+                self.article_text_table = "long_articles_text_copy1"
+                self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
+                self.root_source_id_table = "long_articles_root_source_id_copy1"
+                self.get_off_video_table = "get_off_video_copy1"
             case "pre":
-                self.articleVideos = "long_articles_match_videos"
-                self.articleText = "long_articles_text"
-                self.articleCrawlerVideos = "long_articles_crawler_videos"
-                self.rootSourceIdTable = "long_articles_root_source_id"
-                self.apolloConnection = pyapollos.ApolloClient(
+                self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",
                     config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
                     timeout=10
                 )
+                self.article_match_video_table = "long_articles_match_videos"
+                self.article_text_table = "long_articles_text"
+                self.article_crawler_video_table = "long_articles_crawler_videos"
+                self.root_source_id_table = "long_articles_root_source_id"
+                self.get_off_video_table = "get_off_video"
 
-    def getConfigValue(self, key):
+    def get_config_value(self, key):
         """
         通过 key 获取配置的 Config
         :param key:
         :return:
         """
-        response = self.apolloConnection.get_value(key)
-        return response
+        response = self.apollo_connection.get_value(key)
+        return response

+ 17 - 29
applications/db/__init__.py

@@ -1,38 +1,26 @@
 """
 @author: luojunhui
-self.app.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='crawler',
-            password='crawler123456@',
-            db='piaoquan-crawler',
-            charset='utf8mb4',
-            connect_timeout=120,
-        )
 """
 import aiomysql
 
 
 class AsyncMySQLClient(object):
     """
-    Async MySQL
+    异步 mysql 连接池
     """
 
-    def __init__(self, app):
-        self.app = app
+    def __init__(self, app=None):
+        if not app:
+            self.mysql_pool = None
+        else:
+            self.mysql_pool = app
 
-    async def initPool(self):
+    async def init_pool(self):
         """
         初始化连接
-        host='',
-        port=3306,
-        user='changwen_admin',
-        password='changwen@123456',
-        db='long_articles',
-        charset='utf8mb4'
         :return:
         """
-        self.app.mysql_pool = await aiomysql.create_pool(
+        self.mysql_pool = await aiomysql.create_pool(
             host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
             port=3306,
             user='changwen_admin',
@@ -43,34 +31,34 @@ class AsyncMySQLClient(object):
         )
         print("mysql init successfully")
 
-    async def closePool(self):
+    async def close_pool(self):
         """
         关闭 mysql 连接
         :return:
         """
-        self.app.mysql_pool.close()
-        await self.app.mysql_pool.wait_closed()
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
 
-    async def asyncSelect(self, sql):
+    async def async_select(self, sql):
         """
         select method
         :param sql:
         :return:
         """
-        async with self.app.mysql_pool.acquire() as conn:
+        async with self.mysql_pool.acquire() as conn:
             async with conn.cursor() as cursor:
                 await cursor.execute(sql)
                 result = await cursor.fetchall()
                 return result
 
-    async def asyncInsert(self, sql, params):
+    async def async_insert(self, sql, params):
         """
         insert and update method
         :param params:
         :param sql:
         :return:
         """
-        async with self.app.mysql_pool.acquire() as coon:
+        async with self.mysql_pool.acquire() as coon:
             async with coon.cursor() as cursor:
                 try:
                     await cursor.execute(sql, params)
@@ -112,7 +100,7 @@ class TaskMySQLClient(object):
         self.mysql_pool.close()
         await self.mysql_pool.wait_closed()
 
-    async def asyncSelect(self, sql):
+    async def async_select(self, sql):
         """
         select method
         :param sql:
@@ -124,7 +112,7 @@ class TaskMySQLClient(object):
                 result = await cursor.fetchall()
                 return result
 
-    async def asyncInsert(self, sql, params):
+    async def async_insert(self, sql, params):
         """
         insert and update method
         :param params:

+ 166 - 0
applications/etl_function/__init__.py

@@ -0,0 +1,166 @@
+"""
+@author: luojunhui
+"""
+import os
+import oss2
+import aiohttp
+import aiofiles
+import requests
+
+from hashlib import md5
+from uuid import uuid4
+from fake_useragent import FakeUserAgent
+
+
+async def download_cover(file_path, platform, cover_url):
+    """
+    下载视频封面
+    :param platform:
+    :param cover_url:
+    :param file_path:
+    :return:
+    """
+    headers = request_header(platform=platform, url=cover_url, download_type="cover")
+    response = requests.get(url=cover_url, headers=headers)
+    if b"<html>" in response.content:
+        return None
+    elif response.status_code != 200:
+        return None
+    else:
+        with open(file_path, "wb") as f:
+            f.write(response.content)
+        return file_path
+
+
+def request_header(platform, url, download_type="video"):
+    """
+    请求头
+    :return:
+    """
+    if platform == "xg_search":
+        if "v9-xg-web-pc.ixigua.com" in url:
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "Host": "v9-xg-web-pc.ixigua.com",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://www.ixigua.com/",
+                "Referer": "https://www.ixigua.com/"
+            }
+        elif "v3-xg-web-pc.ixigua.com" in url:
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "Host": "v3-xg-web-pc.ixigua.com",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://www.ixigua.com/",
+                "Referer": "https://www.ixigua.com/"
+            }
+        elif download_type == "cover":
+            headers = {
+                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+                'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+                'Cache-Control': 'max-age=0',
+                'Proxy-Connection': 'keep-alive',
+                'Upgrade-Insecure-Requests': '1',
+                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
+            }
+        else:
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "Host": "v3-xg-web-pc.ixigua.com",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://www.ixigua.com/",
+                "Referer": "https://www.ixigua.com/"
+            }
+    elif platform == "baidu_search":
+        headers = {
+            "Accept": "*/*",
+            "Accept-Language": "zh-CN,zh;q=0.9",
+            "User-Agent": FakeUserAgent().chrome,
+        }
+    elif platform == "wx_search":
+        headers = {
+            "Accept": "*/*",
+            "Accept-Language": "zh-CN,zh;q=0.9",
+            "User-Agent": FakeUserAgent().chrome,
+            "Origin": "https://mp.weixin.qq.com",
+            "Referer": "https://mp.weixin.qq.com"
+        }
+    elif platform == "dy_search":
+        headers = {
+            'accept': '*/*',
+            'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
+            'priority': 'i',
+            'range': 'bytes=0-',
+            'referer': 'https://v11-coldf.douyinvod.com/',
+            'user-agent': FakeUserAgent().chrome
+        }
+    else:
+        headers = {}
+    return headers
+
+
+async def download_video(file_path, platform, video_url, download_type="video"):
+    """
+    :param download_type:
+    :param video_url:
+    :param platform:
+    :param file_path:
+    :return:
+    """
+    headers = request_header(platform=platform, url=video_url, download_type=download_type)
+    if os.path.exists(file_path):
+        file_size = os.path.getsize(file_path)
+        headers["Range"] = f"bytes={file_size}-"
+    else:
+        file_size = 0
+    async with aiohttp.ClientSession() as session:
+        async with session.get(video_url, headers=headers) as response:
+            if response.status in [200, 206]:
+                if file_size > 0:
+                    async with aiofiles.open(file_path, "ab+") as f:
+                        # 以1MB为单位分块下载
+                        async for chunk in response.content.iter_chunked(1024 * 1024):
+                            await f.write(chunk)
+                else:
+                    async with aiofiles.open(file_path, "wb") as f:
+                        # 以1MB为单位分块下载
+                        async for chunk in response.content.iter_chunked(1024 * 1024):
+                            await f.write(chunk)
+
+            else:
+                print(response.status)
+    return file_path
+
+
+def generate_video_path(platform, video_id):
+    """
+    通过视频信息生成唯一视频地址
+    :return:
+    """
+    index = "{}-{}-{}".format(platform, video_id, uuid4())
+    index = md5(index.encode()).hexdigest()
+    file_name = "{}.mp4".format(index)
+    cover_name = "{}.png".format(index)
+    file_path = os.path.join(os.getcwd(), "static", file_name)
+    cover_path = os.path.join(os.getcwd(), "static", cover_name)
+    return file_path, cover_path
+
+
+async def upload_to_oss(local_video_path, download_type):
+    """
+    把视频上传到 oss
+    :return:
+    """
+    oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
+    access_key_id = "LTAIP6x1l3DXfSxm"
+    access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    endpoint = "oss-cn-hangzhou.aliyuncs.com"
+    bucket_name = "art-pubbucket"
+    bucket = oss2.Bucket(
+        oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
+    )
+    bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
+    return oss_video_key

+ 1 - 1
applications/functions/common.py

@@ -7,7 +7,7 @@ import aiohttp
 import asyncio
 
 
-def shuffleList(ori_list):
+def shuffle_list(ori_list):
     """
     随机打乱 list
     :param ori_list:

+ 3 - 3
applications/functions/kimi.py

@@ -21,9 +21,9 @@ class KimiServer(object):
         :param params:
         :return:
         """
-        title = params['articleTitle'].split("@@")[-1]
-        contents = params['articleText']
-        trace_id = params['contentId']
+        title = params['article_title'].split("@@")[-1]
+        contents = params['article_text']
+        trace_id = params['content_id']
         try:
             kimi_title = await cls.kimi_title(title)
         except Exception as e:

+ 4 - 6
applications/functions/pqFunctions.py

@@ -6,7 +6,7 @@ import json
 from applications.functions.common import async_post
 
 
-async def publishToPQ(video_obj):
+async def publish_to_pq(video_obj):
     """
     publish video to pq
     :return:
@@ -44,11 +44,10 @@ async def publishToPQ(video_obj):
         "repeatStatus": 1
     }
     response = await async_post(url, headers, payload)
-    # print(json.dumps(response, ensure_ascii=False, indent=4))
     return response
 
 
-async def getPQVideoDetail(video_id):
+async def get_pq_video_detail(video_id):
     """
     获取票圈视频详情信息
     :return:
@@ -64,7 +63,7 @@ async def getPQVideoDetail(video_id):
     return response
 
 
-async def getNewVideoIds(video_obj_list):
+async def get_new_video_ids(video_obj_list):
     """
     video
     :return:
@@ -72,12 +71,11 @@ async def getNewVideoIds(video_obj_list):
     vid_list = []
     for video_obj in video_obj_list:
         # video_obj 里面的信息对于历史数据可能不全,需要从pq获取
-        print(json.dumps(video_obj, ensure_ascii=False, indent=4))
         try:
             if len(vid_list) >= 3:
                 return vid_list
             else:
-                pq_response = await publishToPQ(video_obj)
+                pq_response = await publish_to_pq(video_obj)
                 video_id = pq_response['data']['id']
                 vid_list.append(video_id)
         except:

+ 12 - 14
applications/functions/video_item.py

@@ -114,7 +114,7 @@ class VideoProducer(object):
     """
 
     @classmethod
-    def wx_video_producer(cls, video_obj, user, trace_id):
+    def wx_video_produce(cls, video_obj, user, trace_id):
         """
             异步处理微信 video_obj
             公众号和站内账号一一对应
@@ -124,13 +124,13 @@ class VideoProducer(object):
             :return:
         """
         platform = "weixin_search"
-        publish_time_stamp = int(video_obj['pubTime'])
+        publish_timestamp = int(video_obj['pubTime'])
         item = VideoItem()
         item.add_video_info("user_id", user)
         # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['hashDocID'])
         item.add_video_info("video_title", trace_id)
-        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("publish_time_stamp", int(publish_timestamp))
         item.add_video_info("video_url", video_obj["videoUrl"])
         item.add_video_info("cover_url", video_obj["image"])
         item.add_video_info("out_video_id", video_obj['hashDocID'])
@@ -142,7 +142,7 @@ class VideoProducer(object):
         return mq_obj
 
     @classmethod
-    def baidu_video_producer(cls, video_obj, user, trace_id):
+    def baidu_video_produce(cls, video_obj, user, trace_id):
         """
         处理好看视频的 video_info
         :param video_obj:
@@ -151,15 +151,13 @@ class VideoProducer(object):
         :return:
         """
         platform = "baidu_search"
-        publish_time_stamp = int(video_obj['publish_time'])
+        publish_timestamp = int(video_obj['publish_time'])
         item = VideoItem()
-        # print("baidu")
-        # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
         item.add_video_info("user_id", user)
         # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['id'])
         item.add_video_info("video_title", video_obj['title'])
-        item.add_video_info("publish_time_stamp", publish_time_stamp)
+        item.add_video_info("publish_time_stamp", publish_timestamp)
         item.add_video_info("video_url", video_obj["playurl"])
         item.add_video_info("cover_url", video_obj["poster"])
         item.add_video_info("out_video_id", video_obj['id'])
@@ -174,7 +172,7 @@ class VideoProducer(object):
         return mq_obj
 
     @classmethod
-    def xg_video_producer(cls, video_obj, user, trace_id):
+    def xg_video_produce(cls, video_obj, user, trace_id):
         """
         西瓜搜索
         :param video_obj:
@@ -183,13 +181,13 @@ class VideoProducer(object):
         :return:
         """
         platform = "xg_search"
-        publish_time_stamp = int(video_obj['publish_time'])
+        publish_timestamp = int(video_obj['publish_time'])
         item = VideoItem()
         item.add_video_info("user_id", user)
         # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['video_id'])
         item.add_video_info("video_title", video_obj.get('video_title'))
-        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("publish_time_stamp", int(publish_timestamp))
         item.add_video_info("video_url", video_obj["video_url"])
         item.add_video_info("cover_url", video_obj["cover_url"])
         item.add_video_info("out_video_id", video_obj['video_id'])
@@ -204,7 +202,7 @@ class VideoProducer(object):
         return mq_obj
 
     @classmethod
-    def dy_video_producer(cls, video_obj, user, trace_id):
+    def dy_video_produce(cls, video_obj, user, trace_id):
         """
         :param video_obj:
         :param user:
@@ -212,7 +210,7 @@ class VideoProducer(object):
         :return:
         """
         platform = "dy_search"
-        publish_time_stamp = int(video_obj['publish_timestamp'] / 1000)
+        publish_timestamp = int(video_obj['publish_timestamp'] / 1000)
         item = VideoItem()
         # print("douyin")
         # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
@@ -220,7 +218,7 @@ class VideoProducer(object):
         # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['channel_content_id'])
         item.add_video_info("video_title", video_obj['title'])
-        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("publish_time_stamp", int(publish_timestamp))
         item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
         item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
         item.add_video_info("out_video_id", video_obj['channel_content_id'])

+ 0 - 15
applications/match_algorithm/rank.py

@@ -36,8 +36,6 @@ def title_similarity_rank(content_title, recall_list):
     :param recall_list:
     :return:
     """
-    print("ori_title", content_title)
-    print("unsorted title list")
     include_title_list = []
     for item in recall_list:
         video_info = item['result']
@@ -49,20 +47,7 @@ def title_similarity_rank(content_title, recall_list):
         else:
             continue
         item['title'] = title
-        print(title)
         item['score'] = jcd_title_similarity(content_title, title)
         include_title_list.append(item)
-    # # include_title_list加上相似度分
-    # title_score_list = [
-    #     {
-    #         'score': jcd_title_similarity(
-    #             content_title,
-    #             item['title']
-    #         ),
-    #         **item
-    #     }
-    #     for item in
-    #     include_title_list
-    # ]
     sorted_list = sorted(include_title_list, key=lambda x: x['score'], reverse=True)
     return sorted_list

+ 34 - 28
applications/spider/__init__.py

@@ -10,12 +10,13 @@ from .spiderAB import SearchABTest
 from .spiderSchedule import SearchMethod
 
 
-async def videoSender(video_obj, user, trace_id, platform, content_id, table, dbClient):
+async def save_video_to_mysql(video_obj, user, trace_id, platform, content_id, crawler_video_table, db_client, similarity_score):
     """
     异步处理微信 video_obj
     公众号和站内账号一一对应
-    :param dbClient:
-    :param table:
+    :param similarity_score:
+    :param crawler_video_table: 爬虫表
+    :param db_client:  mysql
     :param content_id:
     :param platform:
     :param user:
@@ -25,25 +26,25 @@ async def videoSender(video_obj, user, trace_id, platform, content_id, table, db
     """
     Video = VideoProducer()
     if platform == "xg_search":
-        mq_obj = Video.xg_video_producer(
+        mq_obj = Video.xg_video_produce(
             video_obj=video_obj,
             user=user,
             trace_id=trace_id,
         )
     elif platform == "baidu_search":
-        mq_obj = Video.baidu_video_producer(
+        mq_obj = Video.baidu_video_produce(
             video_obj=video_obj,
             user=user,
             trace_id=trace_id,
         )
     elif platform == "wx_search":
-        mq_obj = Video.wx_video_producer(
+        mq_obj = Video.wx_video_produce(
             video_obj=video_obj,
             user=user,
             trace_id=trace_id,
         )
     elif platform == "dy_search":
-        mq_obj = Video.dy_video_producer(
+        mq_obj = Video.dy_video_produce(
             video_obj=video_obj,
             user=user,
             trace_id=trace_id,
@@ -53,12 +54,12 @@ async def videoSender(video_obj, user, trace_id, platform, content_id, table, db
     mq_obj['trace_id'] = trace_id
     mq_obj['content_id'] = content_id
     insert_sql = f"""
-    INSERT INTO {table}
-    (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id)
+    INSERT INTO {crawler_video_table}
+    (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id, score)
     values 
-    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
     """
-    await dbClient.asyncInsert(
+    await db_client.async_insert(
         sql=insert_sql,
         params=(
             content_id,
@@ -67,50 +68,55 @@ async def videoSender(video_obj, user, trace_id, platform, content_id, table, db
             mq_obj['video_title'],
             mq_obj['play_cnt'],
             mq_obj['like_cnt'],
-            datetime.fromtimestamp(mq_obj['publish_time_stamp']).strftime('%Y-%m-%d %H:%M:%S'),
+            datetime.fromtimestamp(mq_obj['publish_timestamp']).strftime('%Y-%m-%d %H:%M:%S'),
             datetime.now().__str__(),
             mq_obj['duration'],
             mq_obj['video_url'],
             mq_obj['cover_url'],
             mq_obj['user_id'],
-            trace_id
+            trace_id,
+            similarity_score
         )
     )
 
 
-async def searchVideos(info, ghIdMap, dbClient):
+async def search_videos_from_web(info, gh_id_map, db_client):
     """
     search and send msg to ETL
-    :param dbClient:
-    :param ghIdMap:
+    :param db_client:
+    :param gh_id_map:
     :param info:
     :return:
     """
-    SearchAB = SearchABTest(info=info, searchMethod=SearchMethod())
+    default_account_id = 69637498
+    search_AB = SearchABTest(info=info, searchMethod=SearchMethod())
     # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
-    trace_id = info['traceId']
-    gh_id = info['ghId']
-    content_id = info['contentId']
-    recall_list = await SearchAB.ab_5()
+    trace_id = info['trace_id']
+    gh_id = info['gh_id']
+    content_id = info['content_id']
+    recall_list = await search_AB.ab_5()
     logging(
         code="1006",
         info="搜索到{}条视频".format(len(recall_list)),
         data=recall_list,
-        trace_id=info['traceId']
+        trace_id=info['trace_id']
     )
     # 按照标题相似度排序
-    ranked_list = title_similarity_rank(content_title=info['oriTitle'].split("@@")[-1], recall_list=recall_list)
+    ranked_list = title_similarity_rank(content_title=info['ori_title'].split("@@")[-1], recall_list=recall_list)
     for recall_obj in ranked_list:
         if recall_obj:
             platform = recall_obj['platform']
             recall_video = recall_obj['result']
+            score = recall_video['score']
             if recall_video:
-                await videoSender(
+                await save_video_to_mysql(
                     video_obj=recall_video,
-                    user=ghIdMap.get(gh_id, 69637498),
+                    user=gh_id_map.get(gh_id, default_account_id),
                     trace_id=trace_id,
                     platform=platform,
                     content_id=content_id,
-                    table=info['spider'],
-                    dbClient=dbClient
-                )
+                    crawler_video_table=info['crawler_video_table'],
+                    db_client=db_client,
+                    similarity_score=score
+                )
+    return len(ranked_list)

+ 57 - 73
applications/spider/spiderAB.py

@@ -14,156 +14,140 @@ class SearchABTest(object):
     trace_id = None
 
     def __init__(self, info, searchMethod):
-        SearchABTest.set_class_properties(info, searchMethod)
+        self.ori_title = info["ori_title"]
+        self.article_summary = info["kimi_summary"]
+        self.article_keys = info["kimi_keys"]
+        self.trace_id = info["trace_id"]
+        self.search_method = searchMethod
 
-    @classmethod
-    def set_class_properties(cls, info, searchMethod):
-        """
-        初始化搜索策略实验类
-        :param searchMethod:
-        :param info: kimi 挖掘的基本信息
-        :return:
-        """
-        cls.ori_title = info["oriTitle"]
-        cls.article_summary = info["kimiSummary"]
-        cls.article_keys = info["kimiKeys"]
-        cls.trace_id = info["traceId"]
-        cls.searchMethod = searchMethod
-
-    @classmethod
-    async def base_line(cls):
+    async def base_search(self):
         """
         兜底策略
         """
-        result = await cls.searchMethod.search_v1(
-            text=cls.article_keys[0],
-            trace_id=cls.trace_id
+        result = await self.search_method.search_v1(
+            text=self.article_keys[0],
+            trace_id=self.trace_id
         )
         if result:
             return result
         else:
-            sub_result = await cls.searchMethod.search_v1(
-                text=cls.article_keys[1],
-                trace_id=cls.trace_id)
+            sub_result = await self.search_method.search_v1(
+                text=self.article_keys[1],
+                trace_id=self.trace_id
+            )
             if sub_result:
                 return sub_result
             else:
-                return await cls.searchMethod.search_v1(
-                    text=cls.article_keys[2],
-                    trace_id=cls.trace_id
+                return await self.search_method.search_v1(
+                    text=self.article_keys[2],
+                    trace_id=self.trace_id
                 )
 
-    @classmethod
-    async def ab_0(cls):
+    async def ab_0(self):
         """
         默认原标题搜索
         :return:
         """
-        search_result = await cls.searchMethod.search_v1(
-            text=cls.ori_title,
-            trace_id=cls.trace_id
+        search_result = await self.search_method.search_v1(
+            text=self.ori_title,
+            trace_id=self.trace_id
         )
         if search_result:
             return search_result
         else:
-            return await cls.base_line()
+            return await self.base_search()
 
-    @classmethod
-    async def ab_1(cls):
+    async def ab_1(self):
         """
         使用 content_summary搜索
         :return:
         """
-        search_result = await cls.searchMethod.search_v1(
-            text=cls.article_summary,
-            trace_id=cls.trace_id
+        search_result = await self.search_method.search_v1(
+            text=self.article_summary,
+            trace_id=self.trace_id
         )
         if search_result:
             return search_result
         else:
-            return await cls.ab_0()
+            return await self.ab_0()
 
-    @classmethod
-    async def ab_2(cls):
+    async def ab_2(self):
         """
         使用文本关键词搜索
         :return:
         """
-        search_result = await cls.searchMethod.search_v1(
-            text=cls.article_keys[0],
-            trace_id=cls.trace_id
+        search_result = await self.search_method.search_v1(
+            text=self.article_keys[0],
+            trace_id=self.trace_id
         )
         if search_result:
             return search_result
         else:
-            return await cls.base_line()
+            return await self.base_search()
 
-    @classmethod
-    async def ab_3(cls):
+    async def ab_3(self):
         """
         使用文本关键词搜索
         :return:
         """
-        search_result = await cls.searchMethod.search_v1(
-            text=cls.article_keys[1],
-            trace_id=cls.trace_id
+        search_result = await self.search_method.search_v1(
+            text=self.article_keys[1],
+            trace_id=self.trace_id
         )
         if search_result:
             return search_result
         else:
-            return await cls.base_line()
+            return await self.base_search()
 
-    @classmethod
-    async def ab_4(cls):
+    async def ab_4(self):
         """
         使用文本关键词搜索
         :return:
         """
-        search_result = await cls.searchMethod.search_v1(
-            text=cls.article_keys[2],
-            trace_id=cls.trace_id
+        search_result = await self.search_method.search_v1(
+            text=self.article_keys[2],
+            trace_id=self.trace_id
         )
         if search_result:
             return search_result
         else:
-            return await cls.base_line()
+            return await self.base_search()
 
-    @classmethod
-    async def ab_5(cls):
+    async def ab_5(self):
         """
         增量搜索, 返回result_list
         :return:
         """
-        result_list = await cls.searchMethod.search_v2(
-            text=cls.article_summary[:15],
-            trace_id=cls.trace_id
+        result_list = await self.search_method.search_v2(
+            text=self.article_summary[:15],
+            trace_id=self.trace_id
         )
         if len(result_list) > 3:
             return result_list
         else:
-            result_list += await cls.searchMethod.search_v2(
-                text=cls.ori_title[:15],
-                trace_id=cls.trace_id
+            result_list += await self.search_method.search_v2(
+                text=self.ori_title[:15],
+                trace_id=self.trace_id
             )
             if len(result_list) > 3:
                 return result_list
             else:
-                result_list += await cls.searchMethod.search_v2(
-                    text=cls.article_keys[0],
-                    trace_id=cls.trace_id
+                result_list += await self.search_method.search_v2(
+                    text=self.article_keys[0],
+                    trace_id=self.trace_id
                 )
                 if len(result_list) > 3:
                     return result_list
                 else:
-                    result_list += await cls.searchMethod.search_v2(
-                        text=cls.article_keys[1],
-                        trace_id=cls.trace_id
+                    result_list += await self.search_method.search_v2(
+                        text=self.article_keys[1],
+                        trace_id=self.trace_id
                     )
                     if result_list:
                         return result_list
                     else:
-                        result_list += await cls.searchMethod.search_v2(
-                            text=cls.article_keys[2],
-                            trace_id=cls.trace_id
+                        result_list += await self.search_method.search_v2(
+                            text=self.article_keys[2],
+                            trace_id=self.trace_id
                         )
                         return result_list

+ 27 - 0
newContentIdTask.py

@@ -0,0 +1,27 @@
+"""
+@author: luojunhui
+"""
+import time
+import datetime
+import asyncio
+from applications.db import AsyncMySQLClient
+from tasks.newContentIdTask import NewContentIdTask
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    async_mysql_pool = AsyncMySQLClient()
+    await async_mysql_pool.init_pool()
+    new_content_id_task = NewContentIdTask(async_mysql_pool)
+    await new_content_id_task.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待60s".format(now_str))
+        time.sleep(60)

+ 24 - 16
server/api/get_off_videos.py

@@ -13,10 +13,11 @@ class GetOffVideos(object):
     def __init__(self, params, mysql_client, config):
         self.params = params
         self.mysql_client = mysql_client
-        self.articles_video = config.articleVideos
+        self.article_match_video_table = config.article_match_video_table
+        self.get_off_videos = config.get_off_video_table
         self.trace_id = None
 
-    def checkParams(self):
+    def check_params(self):
         """
 
         :return:
@@ -32,47 +33,54 @@ class GetOffVideos(object):
             }
             return response
 
-    async def pushVideoIntoQueue(self):
+    async def push_video_into_queue(self):
         """
         将视频id记录到待下架表中
         :return:
         """
         select_sql = f"""
-        select response from {self.articles_video} where trace_id = '{self.trace_id}';
+        select response from {self.article_match_video_table} where trace_id = '{self.trace_id}';
         """
-        result = await self.mysql_client.asyncSelect(sql=select_sql)
+        result = await self.mysql_client.async_select(sql=select_sql)
         if result:
             video_list = json.loads(result[0][0])
             for video in video_list:
                 video_id = video['videoId']
                 try:
                     update_sql = f"""
-                    INSERT INTO get_off_videos
+                    INSERT INTO {self.get_off_videos}
                     (video_id, publish_time, video_status, trace_id)
                     values 
                     (%s, %s, %s, %s);
                     """
-                    await self.mysql_client.asyncInsert(
+                    await self.mysql_client.async_insert(
                         sql=update_sql,
                         params=(video_id, int(time.time()), 1, self.trace_id)
                     )
+                    return {
+                        "status": "success",
+                        "traceId": self.trace_id
+                    }
                 except Exception as e:
-                    print(e)
+                    return {
+                        "status": "fail",
+                        "traceId": self.trace_id,
+                        "msg": "insert fail---{}".format(e)
+                    }
         else:
-            print("该 trace_id不存在")
+            return {
+                "status": "fail",
+                "traceId": self.trace_id,
+                "msg": "traceId error, can't find trace_id"
+            }
 
     async def deal(self):
         """
 
         :return:
         """
-        params_error = self.checkParams()
+        params_error = self.check_params()
         if params_error:
             return params_error
         else:
-            await self.pushVideoIntoQueue()
-            response = {
-                "status": "success",
-                "traceId": self.trace_id
-            }
-            return response
+            return await self.push_video_into_queue()

+ 12 - 12
server/api/record.py

@@ -19,11 +19,11 @@ class Record(object):
         self.gh_id = None
         self.params = params
         self.mysql_client = mysql_client
-        self.article_videos = config.articleVideos
-        self.article_text = config.articleText
+        self.article_match_video_table = config.article_match_video_table
+        self.article_text_table = config.article_text_table
         self.trace_id = "search-{}-{}".format(str(uuid4()), str(int(time.time())))
 
-    def checkParams(self):
+    def check_params(self):
         """
         检查请求params
         :return:
@@ -59,19 +59,19 @@ class Record(object):
             )
             return result
 
-    async def inputIntoArticleVideos(self):
+    async def input_into_article_match_video_table(self):
         """
         把数据插入待处理队列
         :return:
         """
         request_time = int(time.time())
         insert_sql = f"""
-            INSERT INTO {self.article_videos}
+            INSERT INTO {self.article_match_video_table}
                 (trace_id, content_id, flow_pool_level, gh_id, account_name, request_timestamp)
             VALUES 
                 (%s, %s, %s, %s, %s, %s);
             """
-        await self.mysql_client.asyncInsert(
+        await self.mysql_client.async_insert(
             sql=insert_sql,
             params=(
                 self.trace_id,
@@ -89,17 +89,17 @@ class Record(object):
             trace_id=self.trace_id
         )
 
-    async def inputIntoArticleText(self):
+    async def input_into_article_text_table(self):
         """
 
         :return:
         """
         insert_sql = f"""
-        INSERT INTO {self.article_text} (content_id, article_title, article_text)
+        INSERT INTO {self.article_text_table} (content_id, article_title, article_text)
         values (%s, %s, %s);
         """
         try:
-            await self.mysql_client.asyncInsert(
+            await self.mysql_client.async_insert(
                 sql=insert_sql,
                 params=(
                     self.content_id,
@@ -126,13 +126,13 @@ class Record(object):
         deal
         :return:
         """
-        params_error = self.checkParams()
+        params_error = self.check_params()
         if params_error:
             return params_error
         else:
             # 记录数据
-            await self.inputIntoArticleVideos()
-            await self.inputIntoArticleText()
+            await self.input_into_article_match_video_table()
+            await self.input_into_article_text_table()
             res = {
                 "status": "success input to article queue",
                 "code": 0,

+ 26 - 22
server/api/response.py

@@ -24,10 +24,10 @@ class Response(object):
         self.mini_program_type = None
         self.mysql_client = mysql_client
         self.params = params
-        self.article_videos = config.articleVideos
-        self.mini_map = json.loads(config.getConfigValue("miniMap"))
+        self.article_match_video_table = config.article_match_video_table
+        self.mini_program_map = json.loads(config.getConfigValue("miniMap"))
 
-    def checkParams(self):
+    def check_params(self):
         """
         请求参数校验
         :return:
@@ -43,17 +43,17 @@ class Response(object):
                 "info": self.params
             }
 
-    async def getVideosResult(self):
+    async def get_videos_result(self):
         """
         获取结果
         :return:
         """
         select_sql = f"""
         SELECT gh_id, content_status, response, process_times
-        FROM {self.article_videos}
+        FROM {self.article_match_video_table}
         WHERE trace_id = '{self.trace_id}';
         """
-        info_tuple = await self.mysql_client.asyncSelect(select_sql)
+        info_tuple = await self.mysql_client.async_select(select_sql)
         gh_id, content_status, response, process_times = info_tuple[0]
         return {
             "ghId": gh_id,
@@ -62,7 +62,7 @@ class Response(object):
             "processTimes": process_times
         }
 
-    def createGzhPath(self, video_id, shared_uid, gh_id):
+    def create_gzh_path(self, video_id, shared_uid, gh_id):
         """
         :param gh_id: 公众号账号的gh_id
         :param video_id: 视频 id
@@ -99,7 +99,7 @@ class Response(object):
             f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
         )
 
-    async def generateCard(self, index, gh_id, mini_id, item):
+    async def generate_single_card(self, index, gh_id, mini_id, item):
         """
         生成单个分享卡片
         :param item: 单个视频结果
@@ -109,9 +109,9 @@ class Response(object):
         :return:
         """
         str_mini_id = str(mini_id)
-        mini_info = self.mini_map[str_mini_id]
+        mini_info = self.mini_program_map[str_mini_id]
         avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
-        root_share_id, root_source_id, production_path = self.createGzhPath(
+        root_share_id, root_source_id, production_path = self.create_gzh_path(
             video_id=item['videoId'],
             shared_uid=item['uid'],
             gh_id=gh_id
@@ -142,13 +142,15 @@ class Response(object):
         item['rootSourceId'] = root_source_id
         return result, item
 
-    async def generateCards(self, result):
+    async def generate_cards(self, result):
         """
         生成返回卡片
         :return:
         """
         gh_id = result['ghId']
         response = json.loads(result['response'])
+        touliu_mini_program_id = 33
+        we_com_mini_program_id = 27
         match self.mini_program_type:
             case 1:
                 L = []
@@ -156,12 +158,12 @@ class Response(object):
                 for index, item in enumerate(response, 1):
                     random_num = random.randint(1, 10)
                     if random_num in [1, 2, 3, 4, 5, 6]:
-                        mini_id = 25
+                        long_articles_mini_program_id = 25
                     elif random_num in [7, 8]:
-                        mini_id = 29
+                        long_articles_mini_program_id = 29
                     else:
-                        mini_id = 31
-                    card, new_item = await self.generateCard(index, gh_id, mini_id, item)
+                        long_articles_mini_program_id = 31
+                    card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item)
                     L.append(card)
                     new_item_list.append(new_item)
                 return L, new_item_list
@@ -169,7 +171,7 @@ class Response(object):
                 L = []
                 new_item_list = []
                 for index, item in enumerate(response, 1):
-                    card, new_item = await self.generateCard(index, gh_id, 33, item)
+                    card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item)
                     L.append(card)
                     new_item_list.append(new_item)
                 return L, new_item_list
@@ -177,7 +179,7 @@ class Response(object):
                 L = []
                 new_item_list = []
                 for index, item in enumerate(response, 1):
-                    card, new_item = await self.generateCard(index, gh_id, 27, item)
+                    card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item)
                     L.append(card)
                     new_item_list.append(card)
                 return L, new_item_list
@@ -187,7 +189,7 @@ class Response(object):
         执行方法
         :return:
         """
-        response = await self.getVideosResult()
+        response = await self.get_videos_result()
         status_code = response.get('contentStatus')
         process_times = response.get('processTimes')
         match status_code:
@@ -212,13 +214,13 @@ class Response(object):
                     "Message": "该请求正在处理中"
                 }
             case 2:
-                card_list, new_items = await self.generateCards(result=response)
+                card_list, new_items = await self.generate_cards(result=response)
                 update_sql = f"""
-                UPDATE {self.article_videos}
+                UPDATE {self.article_match_video_table}
                 SET response = %s, success_status = %s
                 WHERE trace_id = %s;
                 """
-                await self.mysql_client.asyncInsert(
+                await self.mysql_client.async_insert(
                     sql=update_sql,
                     params=(json.dumps(new_items, ensure_ascii=False), 1, self.trace_id)
                 )
@@ -229,13 +231,15 @@ class Response(object):
                     "code": 3,
                     "error": "匹配失败,超过三次"
                 }
+            case 4:
+                return {}
 
     async def deal(self):
         """
         api process starts from here
         :return:
         """
-        params_error = self.checkParams()
+        params_error = self.check_params()
         if params_error:
             return params_error
         else:

+ 11 - 11
server/routes.py

@@ -28,8 +28,8 @@ def Routes(mysql_client, config):
         :return:
         """
         params = await request.get_json()
-        SD = Record(params=params, mysql_client=mysql_client, config=config)
-        result = await SD.deal()
+        record = Record(params=params, mysql_client=mysql_client, config=config)
+        result = await record.deal()
         return jsonify(result)
 
     @my_blueprint.route('/recall_videos', methods=['POST'])
@@ -39,13 +39,13 @@ def Routes(mysql_client, config):
         :return:
         """
         data = await request.get_json()
-        RD = Response(
+        response = Response(
             params=data,
             mysql_client=mysql_client,
             config=config
         )
-        response = await RD.deal()
-        return jsonify(response)
+        result = await response.deal()
+        return jsonify(result)
 
     @my_blueprint.route("/choose_minigram", methods=['POST'])
     async def match_minigram():
@@ -54,9 +54,9 @@ def Routes(mysql_client, config):
         :return:
         """
         data = await request.get_json()
-        M = Minigram(params=data)
-        response = await M.deal()
-        return jsonify(response)
+        mini_program = Minigram(params=data)
+        result = await mini_program.deal()
+        return jsonify(result)
 
     @my_blueprint.route("/get_off_videos", methods=['POST'])
     async def get_off_videos():
@@ -65,8 +65,8 @@ def Routes(mysql_client, config):
         :return:
         """
         data = await request.get_json()
-        GOV = GetOffVideos(params=data, mysql_client=mysql_client, config=config)
-        response = await GOV.deal()
-        return jsonify(response)
+        get_off_video = GetOffVideos(params=data, mysql_client=mysql_client, config=config)
+        result = await get_off_video.deal()
+        return jsonify(result)
 
     return my_blueprint

+ 39 - 42
tasks/etl_task.py

@@ -18,7 +18,7 @@ from applications.config import Config
 from applications.log import logging
 
 
-async def downloadCover(file_path, platform, cover_url):
+async def download_cover(file_path, platform, cover_url):
     """
     下载视频封面
     :param platform:
@@ -26,7 +26,7 @@ async def downloadCover(file_path, platform, cover_url):
     :param file_path:
     :return:
     """
-    headers = requestHeader(platform=platform, url=cover_url, download_type="cover")
+    headers = request_header(platform=platform, url=cover_url, download_type="cover")
     response = requests.get(url=cover_url, headers=headers)
     if b"<html>" in response.content:
         return None
@@ -38,7 +38,7 @@ async def downloadCover(file_path, platform, cover_url):
         return file_path
 
 
-def requestHeader(platform, url, download_type="video"):
+def request_header(platform, url, download_type="video"):
     """
     请求头
     :return:
@@ -108,7 +108,7 @@ def requestHeader(platform, url, download_type="video"):
     return headers
 
 
-async def downloadVideo(file_path, platform, video_url, download_type="video"):
+async def download_video(file_path, platform, video_url, download_type="video"):
     """
     :param download_type:
     :param video_url:
@@ -116,7 +116,7 @@ async def downloadVideo(file_path, platform, video_url, download_type="video"):
     :param file_path:
     :return:
     """
-    headers = requestHeader(platform=platform, url=video_url, download_type=download_type)
+    headers = request_header(platform=platform, url=video_url, download_type=download_type)
     if os.path.exists(file_path):
         file_size = os.path.getsize(file_path)
         headers["Range"] = f"bytes={file_size}-"
@@ -141,7 +141,7 @@ async def downloadVideo(file_path, platform, video_url, download_type="video"):
     return file_path
 
 
-def generateVideoPath(platform, video_id):
+def generate_video_path(platform, video_id):
     """
     通过视频信息生成唯一视频地址
     :return:
@@ -155,7 +155,7 @@ def generateVideoPath(platform, video_id):
     return file_path, cover_path
 
 
-async def uploadToOss(local_video_path, download_type):
+async def upload_to_oss(local_video_path, download_type):
     """
     把视频上传到 oss
     :return:
@@ -184,21 +184,23 @@ class AsyncETL(object):
         # }
         self.max_retry = 5
         self.mysql_client = mysql_client
-        self.article_crawler_videos = Config().articleCrawlerVideos
+        self.config = Config()
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.article_match_video_table = self.config.article_match_video_table
 
-    async def getTasks(self):
+    async def get_tasks(self):
         """
         获取视频 id
         :return:
         """
         select_sql = f"""
-        SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id
-        FROM {self.article_crawler_videos}
+        SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
+        FROM {self.article_crawler_video_table}
         WHERE download_status = 0
         ORDER BY id
         LIMIT 10;
         """
-        result = await self.mysql_client.asyncSelect(select_sql)
+        result = await self.mysql_client.async_select(select_sql)
         if result:
             tasks = [
                 {
@@ -208,7 +210,8 @@ class AsyncETL(object):
                     "video_title": line[3],
                     "video_url": line[4],
                     "cover_url": line[5],
-                    "user_id": line[6]
+                    "user_id": line[6],
+                    "trace_id": line[7]
                 }
                 for line in result
             ]
@@ -216,91 +219,85 @@ class AsyncETL(object):
         else:
             return []
 
-    async def processTask(self, params):
+    async def process_task(self, params):
         """
         处理 task
         :return:
-        {
-                    "id": line[0],
-                    "video_id": line[1],
-                    "platform": line[2],
-                    "video_title": line[3],
-                    "video_url": line[4],
-                    "cover_url": line[5],
-                    "user_id": line[6]
-                }
         """
+        downloading_status = 1
+        downloaded_status = 2
+        download_failed_status = 3
         update_sql_0 = f"""
-                    UPDATE {self.article_crawler_videos}
+                    UPDATE {self.article_crawler_video_table}
                     SET download_status = %s
                     WHERE id = %s;
                     """
-        await self.mysql_client.asyncInsert(
+        await self.mysql_client.async_insert(
             sql=update_sql_0,
-            params=(1, params['id'])
+            params=(downloading_status, params['id'])
         )
         try:
-            local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id'])
+            local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
             # download videos
-            file_path = await downloadVideo(
+            file_path = await download_video(
                 file_path=local_video_path,
                 platform=params['platform'],
                 video_url=params['video_url']
             )
             # download cover
-            cover_path = await downloadCover(
+            cover_path = await download_cover(
                 file_path=local_cover_path,
                 platform=params['platform'],
                 cover_url=params['cover_url']
             )
-            oss_video = await uploadToOss(
+            oss_video = await upload_to_oss(
                 local_video_path=file_path,
                 download_type="video"
             )
             if cover_path:
-                oss_cover = await uploadToOss(
+                oss_cover = await upload_to_oss(
                     local_video_path=cover_path,
                     download_type="image"
                 )
             else:
                 oss_cover = None
             update_sql = f"""
-            UPDATE {self.article_crawler_videos}
-            SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
-            WHERE id = %s;
+                            UPDATE {self.article_crawler_video_table}
+                            SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                            WHERE id = %s;
             """
-            await self.mysql_client.asyncInsert(
+            await self.mysql_client.async_insert(
                 sql=update_sql,
                 params=(
                     oss_video,
                     oss_cover,
-                    2,
+                    downloaded_status,
                     params['id']
                 )
             )
         except Exception as e:
-            print("failed", e)
             update_sql = f"""
-            UPDATE {self.article_crawler_videos}
+            UPDATE {self.article_crawler_video_table}
             SET download_status = %s
             WHERE id = %s;
             """
-            await self.mysql_client.asyncInsert(
+            await self.mysql_client.async_insert(
                 sql=update_sql,
-                params=(3, params['id'])
+                params=(download_failed_status, params['id'])
             )
+            print("抓取 failed--{}".format(e))
 
     async def deal(self):
         """
         ETL Deal Task
         :return:
         """
-        task_list = await self.getTasks()
+        task_list = await self.get_tasks()
         logging(
             code="5001",
             info="ETL Task Got {} this time".format(len(task_list)),
             function="ETL"
         )
         if task_list:
-            tasks = [self.processTask(params) for params in task_list]
+            tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)

+ 51 - 51
tasks/history_task.py

@@ -7,8 +7,8 @@ import asyncio
 
 from applications.config import Config
 from applications.log import logging
-from applications.functions.pqFunctions import publishToPQ
-from applications.functions.common import shuffleList
+from applications.functions.pqFunctions import publish_to_pq
+from applications.functions.common import shuffle_list
 
 
 class historyContentIdTask(object):
@@ -21,13 +21,14 @@ class historyContentIdTask(object):
         :param mysql_client:
         """
         self.mysql_client = mysql_client
-        self.article_text = Config().articleText
-        self.article_video = Config().articleVideos
-        self.article_crawler_video = Config().articleCrawlerVideos
-        self.history_coroutines = Config().getConfigValue("historyArticleCoroutines")
-        self.gh_id_dict = json.loads(Config().getConfigValue("testAccountLevel2"))
+        self.config = Config()
+        self.article_match_video_table = self.config.article_match_video_table
+        self.article_text_table = self.config.article_text_table
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
+        self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
 
-    async def getTaskList(self):
+    async def get_tasks(self):
         """
         获取任务
         :return:
@@ -39,10 +40,10 @@ class historyContentIdTask(object):
                 ART.flow_pool_level, 
                 ART.gh_id,
                 ART.process_times
-            FROM {self.article_video} ART
+            FROM {self.article_match_video_table} ART
             JOIN (
                 select content_id, count(1) as cnt 
-                from {self.article_crawler_video}
+                from {self.article_crawler_video_table}
                 where download_status = 2
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
@@ -50,7 +51,7 @@ class historyContentIdTask(object):
             ORDER BY request_timestamp
             LIMIT {self.history_coroutines};
         """
-        tasks = await self.mysql_client.asyncSelect(sql=select_sql1)
+        tasks = await self.mysql_client.async_select(sql=select_sql1)
         task_obj_list = [
             {
                 "trace_id": item[0],
@@ -67,17 +68,17 @@ class historyContentIdTask(object):
         )
         return task_obj_list
 
-    async def getVideoList(self, content_id):
+    async def get_video_list(self, content_id):
         """
         content_id
         :return:
         """
         sql = f"""
         SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
-        FROM {self.article_crawler_video}
+        FROM {self.article_crawler_video_table}
         WHERE content_id = '{content_id}' and download_status = 2;
         """
-        res_tuple = await self.mysql_client.asyncSelect(sql)
+        res_tuple = await self.mysql_client.async_select(sql)
         if len(res_tuple) >= 3:
             return [
                 {
@@ -92,22 +93,22 @@ class historyContentIdTask(object):
         else:
             return []
 
-    async def getKimiTitle(self, content_id):
+    async def get_kimi_title(self, content_id):
         """
         获取 kimiTitle
         :param content_id:
         :return:
         """
         select_sql = f"""
-        select kimi_title from {self.article_text} where content_id = '{content_id}';
+        select kimi_title from {self.article_text_table} where content_id = '{content_id}';
         """
-        res_tuple = await self.mysql_client.asyncSelect(select_sql)
+        res_tuple = await self.mysql_client.async_select(select_sql)
         if res_tuple:
             return res_tuple[0][0]
         else:
             return False
 
-    async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
+    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
         """
         发布至 pq
         :param process_times:
@@ -118,15 +119,15 @@ class historyContentIdTask(object):
         :param flow_pool_level: 流量池层级 ---> str
         :return:
         """
-        # video_list = download_videos[:3]
+        published_status = 4
         match flow_pool_level:
             case "autoArticlePoolLevel4":
                 # 冷启层, 全量做
-                video_list = shuffleList(download_videos)[:3]
+                video_list = shuffle_list(download_videos)[:3]
             case "autoArticlePoolLevel3":
                 # 次条,只针对具体账号做
                 if self.gh_id_dict.get(gh_id):
-                    video_list = shuffleList(download_videos)[:3]
+                    video_list = shuffle_list(download_videos)[:3]
                 else:
                     video_list = download_videos[:3]
             case "autoArticlePoolLevel2":
@@ -144,7 +145,7 @@ class historyContentIdTask(object):
                 "uid": video_obj['uid'],
                 "title": kimi_title
             }
-            response = await publishToPQ(params)
+            response = await publish_to_pq(params)
             time.sleep(2)
             obj = {
                 "uid": video_obj['uid'],
@@ -153,17 +154,17 @@ class historyContentIdTask(object):
                 "videoId": response['data']['id'],
                 "videoCover": response['data']['shareImgPath'],
                 "videoPath": response['data']['videoPath'],
-                "videoOss": video_obj['video_oss_path'].split("/")[-1]
+                "videoOss": video_obj['video_oss_path']
             }
             L.append(obj)
         update_sql = f"""
-           UPDATE {self.article_video}
+           UPDATE {self.article_match_video_table}
            SET content_status = %s, response = %s, process_times = %s
            WHERE trace_id = %s;
            """
-        await self.mysql_client.asyncInsert(
+        await self.mysql_client.async_insert(
             sql=update_sql,
-            params=(2, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
+            params=(published_status, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
         )
         logging(
             code="9002",
@@ -171,7 +172,7 @@ class historyContentIdTask(object):
             trace_id=trace_id
         )
 
-    async def processTask(self, params):
+    async def process_task(self, params):
         """
         异步执行
         :param params:
@@ -182,32 +183,31 @@ class historyContentIdTask(object):
         flow_pool_level = params['flow_pool_level']
         gh_id = params['gh_id']
         process_times = params['process_times']
-        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
-        download_videos = await self.getVideoList(content_id=content_id)
+        download_videos = await self.get_video_list(content_id=content_id)
         if download_videos:
             # 把状态修改为 4
-            update_sql = f"""
-            UPDATE {self.article_video}
-            SET content_status = %s 
-            WHERE trace_id = %s;
             """
-            await self.mysql_client.asyncInsert(
-                sql=update_sql,
-                params=(4, trace_id)
-            )
+            todo: 加上状态锁,防止多个进程同时处理一个视频, 通过 update_time && content_id来判断
+            """
+            # update_sql = f"""
+            # UPDATE {self.article_crawler_video_table}
+            # SET content_status = %s
+            # WHERE trace_id = %s;
+            # """
+            # await self.mysql_client.asyncInsert(
+            #     sql=update_sql,
+            #     params=(4, trace_id)
+            # )
 
-            kimi_title = await self.getKimiTitle(content_id)
-            if kimi_title:
-                await self.publishVideosToPq(
-                    flow_pool_level=flow_pool_level,
-                    kimi_title=kimi_title,
-                    gh_id=gh_id,
-                    trace_id=trace_id,
-                    download_videos=download_videos,
-                    process_times=process_times
-                )
-            else:
-                print("Kimi title 生成失败---后续加报警")
+            kimi_title = await self.get_kimi_title(content_id)
+            await self.publish_videos_to_pq(
+                flow_pool_level=flow_pool_level,
+                kimi_title=kimi_title,
+                gh_id=gh_id,
+                trace_id=trace_id,
+                download_videos=download_videos,
+                process_times=process_times
+            )
         else:
             pass
 
@@ -216,14 +216,14 @@ class historyContentIdTask(object):
         处理
         :return:
         """
-        task_list = await self.getTaskList()
+        task_list = await self.get_tasks()
         logging(
             code="5002",
             info="History content_task Task Got {} this time".format(len(task_list)),
             function="History Contents Task"
         )
         if task_list:
-            tasks = [self.processTask(params) for params in task_list]
+            tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
             print("暂时未获得历史已存在文章")

+ 19 - 17
tasks/kimi_task.py

@@ -19,20 +19,21 @@ class KimiTask(object):
         :param mysql_client:
         """
         self.mysql_client = mysql_client
-        self.config = Config()
+        self.article_match_video_table = Config().article_match_video_table
+        self.article_text_table = Config().article_text_table
 
-    async def getTasks(self):
+    async def get_tasks(self):
         """
         获取 tasks
         :return:
         """
         sql = f"""
         SELECT content_id, article_title, article_text
-        FROM {self.config.articleText}
-        WHERE kimi_status = 0
+        FROM {self.article_text_table}
+        WHERE kimi_status = 0 
         limit 5;
         """
-        content_list = await self.mysql_client.asyncSelect(sql)
+        content_list = await self.mysql_client.async_select(sql)
         if content_list:
             task_list = [
                 {
@@ -45,11 +46,13 @@ class KimiTask(object):
         else:
             return []
 
-    async def processTask(self, params):
+    async def process_task(self, params):
         """
         do something
         :return:
         """
+        kimi_success_status = 1
+        kimi_fail_status = 2
         K = KimiServer()
         try:
             kimi_info = await K.search_kimi_schedule(params=params)
@@ -57,28 +60,27 @@ class KimiTask(object):
             content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
             content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
             update_kimi_sql = f"""
-            UPDATE {self.config.articleText} 
+            UPDATE {self.article_text_table} 
             SET
                 kimi_title = %s,
                 kimi_summary = %s,
                 kimi_keys = %s,
                 kimi_status = %s
-            WHERE content_id = %s;
-                                    """
-            await self.mysql_client.asyncInsert(
+            WHERE content_id = %s;"""
+            await self.mysql_client.async_insert(
                 sql=update_kimi_sql,
-                params=(kimi_title, content_title, content_keys, 1, params['contentId'])
+                params=(kimi_title, content_title, content_keys, kimi_success_status, params['contentId'])
             )
         except Exception as e:
             update_kimi_sql = f"""
-            UPDATE {self.config.articleText}
+            UPDATE {self.article_match_video_table}
             SET
                 kimi_status = %s 
             WHERE content_id = %s
             """
-            await self.mysql_client.asyncInsert(
+            await self.mysql_client.async_insert(
                 sql=update_kimi_sql,
-                params=(2, params['contentId'])
+                params=(kimi_fail_status, params['contentId'])
             )
             print("kimi error--{}".format(e))
 
@@ -87,14 +89,14 @@ class KimiTask(object):
         deal function
         :return:
         """
-        task_list = await self.getTasks()
+        task_list = await self.get_tasks()
         logging(
-            code="5003",
+            code="5001",
             info="KIMI Task Got {} this time".format(len(task_list)),
             function="Kimi Task"
         )
         if task_list:
-            tasks = [self.processTask(params) for params in task_list]
+            tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
             print("没有要处理的 kimi 文章")

+ 662 - 0
tasks/newContentIdTask.py

@@ -0,0 +1,662 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+
+import asyncio
+
+from applications.config import Config
+from applications.log import logging
+from applications.functions.pqFunctions import publish_to_pq
+from applications.functions.common import shuffle_list
+from applications.functions.kimi import KimiServer
+from applications.spider import search_videos_from_web
+from applications.etl_function import *
+
+
+class NewContentIdTask(object):
+    """
+    不存在历史已经发布的文章的匹配流程
+    """
+
+    def __init__(self, mysql_client):
+        self.mysql_client = mysql_client
+        self.config = Config()
+        self.article_match_video_table = self.config.article_match_video_table
+        self.article_text_table = self.config.article_text_table
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
+        self.account_map = json.loads(self.config.get_config_value("accountMap"))
+        self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
+        self.default_status = 0
+        self.task_processing_status = 101
+        self.task_defeat_status = 99
+        self.article_text_table_error = 4
+        self.max_process_times = 3
+
+    async def get_tasks(self):
+        """
+        获取 task
+        :return:
+        """
+        # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
+        select_processing_sql = f"""
+            SELECT trace_id, content_status_update_time, process_times
+            FROM {self.article_match_video_table}
+            WHERE content_status = {self.task_processing_status} and process_times <= {self.max_process_times}; 
+        """
+        processing_articles = await self.mysql_client.async_select(select_processing_sql)
+        if processing_articles:
+            processing_list = [
+                {
+                    "trace_id": item[0],
+                    "content_status_update_time": item[1],
+                    "process_times": item[2]
+                }
+                for item in processing_articles[0]
+            ]
+            for obj in processing_list:
+                if int(time.time()) - obj['content_status_update_time'] >= 3600:
+                    # 认为该任务失败
+                    await self.roll_back_content_status_when_fails(
+                        process_times=obj['process_times'] + 1,
+                        trace_id=obj['trace_id']
+                    )
+        # 将  process_times > 3 的任务的状态修改为失败
+        update_status_sql = f"""
+            UPDATE {self.article_match_video_table}
+            SET content_status = %s
+            WHERE process_times > %s;
+        """
+        await self.mysql_client.async_insert(
+            update_status_sql,
+            params=(self.default_status, self.max_process_times)
+        )
+        # 获取  process_times <= 3 且  content_status = 0 的任务
+        select_sql = f"""
+            SELECT trace_id, content_id, flow_pool_level, gh_id, process_times
+            FROM {self.article_match_video_table} 
+            WHERE content_status = {self.default_status} and process_times <= {self.max_process_times}
+            limit {self.spider_coroutines};
+        """
+        tasks = await self.mysql_client.async_select(select_sql)
+        if tasks:
+            return [
+                {
+                    "trace_id": i[0],
+                    "content_id": i[1],
+                    "flow_pool_level": i[2],
+                    "gh_id": i[3],
+                    "process_times": i[4]
+                }
+                for i in tasks
+            ]
+        else:
+            return []
+
+    async def get_video_list(self, content_id):
+        """
+        判断该文章是否存在历史匹配视频
+        :param content_id
+        :return:
+        """
+        sql = f"""
+        SELECT id
+        FROM {self.article_crawler_video_table}
+        WHERE content_id = '{content_id}' and download_status = 2;
+        """
+        res_tuple = await self.mysql_client.async_select(sql)
+        if len(res_tuple) >= 3:
+            return True
+        else:
+            return False
+
+    async def update_content_status(self, new_content_status, trace_id, ori_content_status):
+        """
+        :param new_content_status:
+        :param trace_id:
+        :param ori_content_status:
+        :return:
+        """
+        update_sql = f"""
+                    UPDATE {self.article_match_video_table}
+                    SET content_status = %s, content_status_update_time = %s
+                    WHERE trace_id = %s and content_status = %s;
+                    """
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                new_content_status,
+                int(time.time()),
+                trace_id,
+                ori_content_status
+            )
+        )
+
+    async def roll_back_content_status_when_fails(self, process_times, trace_id):
+        """
+        处理失败,回滚至初始状态,处理次数加 1
+        :param process_times:
+        :param trace_id:
+        :return:
+        """
+        update_article_sql = f"""
+                            UPDATE {self.article_match_video_table}
+                            SET
+                                content_status = %s, 
+                                content_status_update_time = %s,
+                                process_times = %s,
+                            WHERE trace_id = %s and content_status = %s;
+                        """
+        await self.mysql_client.async_insert(
+            sql=update_article_sql,
+            params=(
+                self.default_status,
+                int(time.time()),
+                process_times + 1,
+                trace_id,
+                self.task_processing_status
+            )
+        )
+
+    async def judge_whether_same_content_id_is_processing(self, content_id):
+        """
+        同一个 content_id只需要处理一次
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+                   SELECT distinct content_status
+                   FROM {self.article_match_video_table}
+                   WHERE content_id = '{content_id}';
+        """
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                content_status = item[0]
+                if content_status != self.default_status:
+                    return True
+            return False
+        else:
+            return False
+
+    async def get_downloaded_videos(self, content_id):
+        """
+        获取已下载的视频
+        :return:
+        """
+        sql = f"""
+                SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
+                FROM {self.article_crawler_video_table}
+                WHERE content_id = '{content_id}' and download_status = 2;
+                """
+        res_tuple = await self.mysql_client.async_select(sql)
+        return [
+            {
+                "platform": i[0],
+                "play_count": i[1],
+                "like_count": i[2],
+                "video_oss_path": i[3],
+                "cover_oss_path": i[4],
+                "uid": i[5]
+            }
+            for i in res_tuple]
+
+    async def get_kimi_status(self, content_id):
+        """
+        通过 content_id 获取kimi info
+        :return:
+        """
+        select_sql = f"""
+                    select kimi_status
+                    from {self.article_text_table}
+                    where content_id = '{content_id}';
+                    """
+        response = await self.mysql_client.async_select(select_sql)
+        if response:
+            kimi_status = response[0][0]
+            return kimi_status
+        else:
+            return self.article_text_table_error
+
+    async def kimi_task(self, params):
+        """
+        执行 kimi 任务
+        :return:
+        """
+        kimi_success_status = 1
+        kimi_fail_status = 2
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        process_times = params['process_times']
+        kimi_status_code = await self.get_kimi_status(content_id=content_id)
+        if kimi_status_code == kimi_success_status:
+            await self.update_content_status(
+                new_content_status=kimi_success_status,
+                trace_id=trace_id,
+                ori_content_status=self.default_status
+            )
+            """
+            {
+                    "kimi_title": kimi_title,
+                    "ori_title": article_obj['article_title'],
+                    "kimi_summary": content_title,
+                    "kimi_keys": kimi_info['content_keys']
+                }
+            """
+            get_kimi_sql = f"""
+            SELECT article_title, kimi_title, kimi_summary, kimi_keys
+            FROM {self.article_text_table}
+            WHERE content_id = '{content_id}';
+            """
+            kimi_info = await self.mysql_client.async_select(get_kimi_sql)
+            return {
+                "kimi_title": kimi_info[0][1],
+                "ori_title": kimi_info[0][0],
+                "kimi_summary": kimi_info[0][2],
+                "kimi_keys": json.loads(kimi_info[0][3])
+            }
+        elif kimi_status_code == self.article_text_table_error:
+            """
+            todo: 文章表和匹配表没有同步更新,暂时不处理此次任务
+            """
+            print("article_text表还没有更新")
+        else:
+            # 开始处理,讲 content_status 从 0  改为  101
+            await self.update_content_status(
+                new_content_status=self.task_processing_status,
+                trace_id=trace_id,
+                ori_content_status=self.default_status
+            )
+            K = KimiServer()
+            try:
+                select_sql = f"""
+                select article_title, article_text
+                from {self.article_text_table}
+                where content_id = '{content_id}'
+                """
+                res = await self.mysql_client.async_select(select_sql)
+                article_obj = {
+                    "article_title": res[0][0],
+                    "article_text": res[0][1],
+                    "content_id": content_id
+                }
+                kimi_info = await K.search_kimi_schedule(params=article_obj)
+                kimi_title = kimi_info['k_title']
+                content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
+                content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
+                update_kimi_sql = f"""
+                        UPDATE {self.article_text_table} 
+                        SET
+                            kimi_title = %s,
+                            kimi_summary = %s,
+                            kimi_keys = %s,
+                            kimi_status = %s
+                        WHERE content_id = %s;"""
+                await self.mysql_client.async_insert(
+                    sql=update_kimi_sql,
+                    params=(kimi_title, content_title, content_keys, kimi_success_status, params['content_id'])
+                )
+                await self.update_content_status(
+                    new_content_status=kimi_success_status,
+                    trace_id=trace_id,
+                    ori_content_status=self.task_processing_status
+                )
+                return {
+                    "kimi_title": kimi_title,
+                    "ori_title": article_obj['article_title'],
+                    "kimi_summary": content_title,
+                    "kimi_keys": kimi_info['content_keys']
+                }
+            except Exception as e:
+                # kimi 任务处理失败
+                update_kimi_sql = f"""
+                        UPDATE {self.article_text_table}
+                        SET
+                            kimi_status = %s 
+                        WHERE content_id = %s
+                        """
+                await self.mysql_client.async_insert(
+                    sql=update_kimi_sql,
+                    params=(kimi_fail_status, content_id)
+                )
+                # 将状态由 101  回退为  0
+                await self.roll_back_content_status_when_fails(
+                    process_times=process_times,
+                    trace_id=trace_id
+                )
+                return {}
+
+    async def spider_task(self, params, kimi_result):
+        """
+        爬虫任务
+        :return:
+        """
+        spider_default_status = 1
+        spider_success_status = 2
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        process_times = params['process_times']
+        gh_id = params['gh_id']
+        try:
+            # 开始处理,将状态由 1 改成  101
+            await self.update_content_status(
+                new_content_status=self.task_processing_status,
+                ori_content_status=spider_default_status,
+                trace_id=trace_id
+            )
+            search_videos_count = await search_videos_from_web(
+                info={
+                    "ori_title": kimi_result['ori_title'],
+                    "kimi_summary": kimi_result['kimi_summary'],
+                    "kimi_keys": kimi_result['kimi_keys'],
+                    "trace_id": trace_id,
+                    "gh_id": gh_id,
+                    "content_id": content_id,
+                    "crawler_video_table": self.article_crawler_video_table
+                },
+                gh_id_map=self.account_map,
+                db_client=self.mysql_client
+            )
+            if search_videos_count >= 3:
+                # 表示爬虫任务执行成功, 将状态从 101  改未 2
+                await self.update_content_status(
+                    new_content_status=spider_success_status,
+                    trace_id=trace_id,
+                    ori_content_status=self.task_processing_status
+                )
+                return True
+            else:
+                await self.roll_back_content_status_when_fails(
+                    process_times=process_times + 1,
+                    trace_id=trace_id
+                )
+                return False
+        except Exception as e:
+            await self.roll_back_content_status_when_fails(
+                process_times=process_times + 1,
+                trace_id=trace_id
+            )
+            print("爬虫处理失败: {}".format(e))
+            return False
+
+    async def etl_task(self, params):
+        """
+        download && upload videos
+        :param params:
+        :return:
+        """
+        video_download_success_status = 2
+        video_download_fail_status = 3
+        etl_task_default_status = 2
+        etl_task_success_status = 3
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        # 判断是否有三条已经下载完成的视频
+        select_sql = f"""
+        select count(id) 
+        from {self.article_crawler_video_table} 
+        where content_id = '{content_id}' and download_status = {video_download_success_status};
+        """
+        video_count_tuple = await self.mysql_client.async_select(select_sql)
+        video_count = video_count_tuple[0][0]
+        if video_count > 3:
+            await self.update_content_status(
+                ori_content_status=etl_task_default_status,
+                trace_id=trace_id,
+                new_content_status=etl_task_success_status
+            )
+            return True
+        else:
+            # 开始处理, 将文章状态修改为处理状态
+            await self.update_content_status(
+                ori_content_status=etl_task_default_status,
+                trace_id=trace_id,
+                new_content_status=self.task_processing_status
+            )
+            select_sql = f"""
+                SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
+                FROM {self.article_crawler_video_table}
+                WHERE content_id = '{content_id}' and download_status != {video_download_success_status}
+                ORDER BY score DESC;
+            """
+            videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
+            videos_need_to_download_list = videos_need_to_download_tuple[0]
+            downloaded_count = 0
+            for line in videos_need_to_download_list:
+                params = {
+                    "id": line[0],
+                    "video_id": line[1],
+                    "platform": line[2],
+                    "video_title": line[3],
+                    "video_url": line[4],
+                    "cover_url": line[5],
+                    "user_id": line[6],
+                    "trace_id": line[7]
+                }
+                try:
+                    local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
+                    # download videos
+                    file_path = await download_video(
+                        file_path=local_video_path,
+                        platform=params['platform'],
+                        video_url=params['video_url']
+                    )
+                    # download cover
+                    cover_path = await download_cover(
+                        file_path=local_cover_path,
+                        platform=params['platform'],
+                        cover_url=params['cover_url']
+                    )
+                    oss_video = await upload_to_oss(
+                        local_video_path=file_path,
+                        download_type="video"
+                    )
+                    if cover_path:
+                        oss_cover = await upload_to_oss(
+                            local_video_path=cover_path,
+                            download_type="image"
+                        )
+                    else:
+                        oss_cover = None
+                    update_sql = f"""
+                                    UPDATE {self.article_crawler_video_table}
+                                    SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                                    WHERE id = %s;
+                    """
+                    await self.mysql_client.async_insert(
+                        sql=update_sql,
+                        params=(
+                            oss_video,
+                            oss_cover,
+                            video_download_success_status,
+                            params['id']
+                        )
+                    )
+                    downloaded_count += 1
+                except Exception as e:
+                    update_sql = f"""
+                    UPDATE {self.article_crawler_video_table}
+                    SET download_status = %s
+                    WHERE id = %s;
+                    """
+                    await self.mysql_client.async_insert(
+                        sql=update_sql,
+                        params=(video_download_fail_status, params['id'])
+                    )
+            if downloaded_count >= 3:
+                await self.update_content_status(
+                    ori_content_status=self.task_processing_status,
+                    trace_id=trace_id,
+                    new_content_status=etl_task_success_status
+                )
+                return True
+            else:
+                await self.roll_back_content_status_when_fails(
+                    process_times=params['process_times'] + 1,
+                    trace_id=params['trace_id']
+                )
+                return False
+
+    async def publish_task(self, params, kimi_title):
+        """
+        发布任务
+        :param kimi_title:
+        :param params:
+        :return:
+        """
+        publish_default_status = 3
+        publish_success_status = 4
+        gh_id = params['gh_id']
+        flow_pool_level = params['flow_pool_level']
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        process_times = params['process_times']
+        # 开始处理,将状态修改为操作状态
+        await self.update_content_status(
+            ori_content_status=publish_default_status,
+            trace_id=trace_id,
+            new_content_status=self.task_processing_status
+        )
+        try:
+            download_videos = await self.get_video_list(content_id)
+            match flow_pool_level:
+                case "autoArticlePoolLevel4":
+                    # 冷启层, 全量做
+                    video_list = shuffle_list(download_videos)[:3]
+                case "autoArticlePoolLevel3":
+                    if self.gh_id_dict.get(gh_id):
+                        video_list = shuffle_list(download_videos)[:3]
+                    else:
+                        video_list = download_videos[:3]
+                case "autoArticlePoolLevel2":
+                    # 次条,只针对具体账号做
+                    video_list = []
+                case "autoArticlePoolLevel1":
+                    # 头条,先不做
+                    video_list = download_videos[:3]
+                case _:
+                    video_list = download_videos[:3]
+            L = []
+            for video_obj in video_list:
+                params = {
+                    "videoPath": video_obj['video_oss_path'],
+                    "uid": video_obj['uid'],
+                    "title": kimi_title
+                }
+                response = await publish_to_pq(params)
+                time.sleep(2)
+                obj = {
+                    "uid": video_obj['uid'],
+                    "source": video_obj['platform'],
+                    "kimiTitle": kimi_title,
+                    "videoId": response['data']['id'],
+                    "videoCover": response['data']['shareImgPath'],
+                    "videoPath": response['data']['videoPath'],
+                    "videoOss": video_obj['video_oss_path']
+                }
+                L.append(obj)
+            update_sql = f"""
+                    UPDATE {self.article_match_video_table}
+                    SET content_status = %s, response = %s, process_times = %s
+                    WHERE trace_id = %s and content_status = %s;
+                    """
+            # 从操作中状态修改为已发布状态
+            await self.mysql_client.async_insert(
+                sql=update_sql,
+                params=(
+                    publish_success_status,
+                    json.dumps(L, ensure_ascii=False),
+                    process_times + 1,
+                    trace_id,
+                    self.task_processing_status
+                )
+            )
+        except Exception as e:
+            await self.roll_back_content_status_when_fails(
+                process_times=params['process_times'] + 1,
+                trace_id=params['trace_id']
+            )
+            print(e)
+
+    async def start_process(self, params):
+        """
+        处理单篇文章
+        :param params:
+        :return:
+        """
+        # step1: 执行 kimi 操作
+        kimi_result = await self.kimi_task(params)
+        if kimi_result:
+            # 等待 kimi 操作执行完成之后,开始执行 spider_task
+            spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
+            if spider_flag:
+                # 等待爬虫执行完成后,开始执行 etl_task
+                etl_flag = await self.etl_task(params)
+                if etl_flag:
+                    # 等待下载上传完成,执行发布任务
+                    try:
+                        await self.publish_task(params, kimi_result['kimi_title'])
+                    except Exception as e:
+                        logging(
+                            code="9001",
+                            info="publish 失败--{}".format(e),
+                            trace_id=params['trace_id']
+                        )
+                else:
+                    logging(
+                        code="8001",
+                        info="ETL 处理失败",
+                        trace_id=params['trace_id']
+                    )
+            else:
+                logging(
+                    code="7002",
+                    info="爬虫处理失败",
+                    trace_id=params['trace_id']
+                )
+        else:
+            logging(
+                code="6001",
+                info="kimi 处理失败",
+                trace_id=params['trace_id']
+            )
+
+    async def process_task(self, params):
+        """
+        处理任务
+        :return:
+        """
+        content_id = params['content_id']
+        download_videos = await self.get_video_list(content_id)
+        if not download_videos:
+            # 开始处理, 判断是否有相同的文章 id 正在处理
+            processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
+            if processing_flag:
+                logging(
+                    code="9001",
+                    info="该 content id 正在处理中, 跳过此任务"
+                )
+            else:
+                await self.start_process(params=params)
+
+    async def deal(self):
+        """
+        function
+        :return:
+        """
+        task_list = await self.get_tasks()
+        logging(
+            code="5001",
+            info="Match Task Got {} this time".format(len(task_list)),
+            function="Publish Task"
+        )
+        if task_list:
+            tasks = [self.process_task(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 38 - 40
tasks/publish_task.py

@@ -8,8 +8,8 @@ import time
 
 from applications.config import Config
 from applications.log import logging
-from applications.functions.pqFunctions import publishToPQ
-from applications.functions.common import shuffleList
+from applications.functions.pqFunctions import publish_to_pq
+from applications.functions.common import shuffle_list
 
 
 class publishTask(object):
@@ -23,23 +23,24 @@ class publishTask(object):
 
     def __init__(self, mysql_client):
         self.mysql_client = mysql_client
-        self.article_video = Config().articleVideos
-        self.article_text = Config().articleText
-        self.article_crawler_video = Config().articleCrawlerVideos
-        self.gh_id_dict = json.loads(Config().getConfigValue("testAccountLevel2"))
+        self.config = Config()
+        self.article_match_video_table = self.config.article_match_video_table
+        self.article_text_table = self.config.article_text_table
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
 
-    async def getTasks(self):
+    async def get_tasks(self):
         """
         获取 task
         :return:
         """
         select_sql = f"""
         SELECT trace_id, content_id, flow_pool_level, gh_id
-        FROM {self.article_video} 
-        WHERE content_status = 1
+        FROM {self.article_match_video_table} 
+        WHERE content_status = 3
         limit 10;
         """
-        tasks = await self.mysql_client.asyncSelect(select_sql)
+        tasks = await self.mysql_client.async_select(select_sql)
         if tasks:
             return [
                 {
@@ -53,17 +54,17 @@ class publishTask(object):
         else:
             return []
 
-    async def getVideoList(self, content_id):
+    async def get_video_list(self, content_id):
         """
         content_id
         :return:
         """
         sql = f"""
         SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
-        FROM {self.article_crawler_video}
+        FROM {self.article_crawler_video_table}
         WHERE content_id = '{content_id}' and download_status = 2;
         """
-        res_tuple = await self.mysql_client.asyncSelect(sql)
+        res_tuple = await self.mysql_client.async_select(sql)
         if len(res_tuple) >= 3:
             return [
                 {
@@ -78,22 +79,22 @@ class publishTask(object):
         else:
             return []
 
-    async def getKimiTitle(self, content_id):
+    async def get_kimi_title(self, content_id):
         """
         获取 kimiTitle
         :param content_id:
         :return:
         """
         select_sql = f"""
-        select kimi_title from {self.article_text} where content_id = '{content_id}';
+        select kimi_title from {self.article_text_table} where content_id = '{content_id}';
         """
-        res_tuple = await self.mysql_client.asyncSelect(select_sql)
+        res_tuple = await self.mysql_client.async_select(select_sql)
         if res_tuple:
             return res_tuple[0][0]
         else:
             return False
 
-    async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
+    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
         """
         发布至 pq
         :param trace_id:
@@ -103,14 +104,14 @@ class publishTask(object):
         :param flow_pool_level: 流量池层级 ---> str
         :return:
         """
-        # video_list = download_videos[:3]
+        publish_success_status = 4
         match flow_pool_level:
             case "autoArticlePoolLevel4":
                 # 冷启层, 全量做
-                video_list = shuffleList(download_videos)[:3]
+                video_list = shuffle_list(download_videos)[:3]
             case "autoArticlePoolLevel3":
                 if self.gh_id_dict.get(gh_id):
-                    video_list = shuffleList(download_videos)[:3]
+                    video_list = shuffle_list(download_videos)[:3]
                 else:
                     video_list = download_videos[:3]
             case "autoArticlePoolLevel2":
@@ -130,7 +131,7 @@ class publishTask(object):
                 "uid": video_obj['uid'],
                 "title": kimi_title
             }
-            response = await publishToPQ(params)
+            response = await publish_to_pq(params)
             time.sleep(2)
             obj = {
                 "uid": video_obj['uid'],
@@ -139,20 +140,20 @@ class publishTask(object):
                 "videoId": response['data']['id'],
                 "videoCover": response['data']['shareImgPath'],
                 "videoPath": response['data']['videoPath'],
-                "videoOss": video_obj['video_oss_path'].split("/")[-1]
+                "videoOss": video_obj['video_oss_path']
             }
             L.append(obj)
         update_sql = f"""
-        UPDATE {self.article_video}
+        UPDATE {self.article_match_video_table}
         SET content_status = %s, response = %s
         WHERE trace_id = %s;
         """
-        await self.mysql_client.asyncInsert(
+        await self.mysql_client.async_insert(
             sql=update_sql,
-            params=(2, json.dumps(L, ensure_ascii=False), trace_id)
+            params=(publish_success_status, json.dumps(L, ensure_ascii=False), trace_id)
         )
 
-    async def processTask(self, params):
+    async def process_task(self, params):
         """
         处理任务
         :return:
@@ -161,19 +162,16 @@ class publishTask(object):
         flow_pool_level = params['flow_pool_level']
         content_id = params['content_id']
         trace_id = params['trace_id']
-        download_videos = await self.getVideoList(content_id)
+        download_videos = await self.get_video_list(content_id)
         if download_videos:
-            kimi_title = await self.getKimiTitle(content_id)
-            if kimi_title:
-                await self.publishVideosToPq(
-                    flow_pool_level=flow_pool_level,
-                    kimi_title=kimi_title,
-                    gh_id=gh_id,
-                    trace_id=trace_id,
-                    download_videos=download_videos
-                )
-            else:
-                print("Kimi title 生成失败---后续加报警")
+            kimi_title = await self.get_kimi_title(content_id)
+            await self.publish_videos_to_pq(
+                flow_pool_level=flow_pool_level,
+                kimi_title=kimi_title,
+                gh_id=gh_id,
+                trace_id=trace_id,
+                download_videos=download_videos
+            )
         else:
             print("该 content_id还未下载完成")
 
@@ -182,14 +180,14 @@ class publishTask(object):
         function
         :return:
         """
-        task_list = await self.getTasks()
+        task_list = await self.get_tasks()
         logging(
             code="5004",
             info="PublishTask Got {} this time".format(len(task_list)),
             function="Publish Task"
         )
         if task_list:
-            tasks = [self.processTask(params) for params in task_list]
+            tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
             logging(

+ 107 - 65
tasks/spider_task.py

@@ -3,42 +3,52 @@
 """
 import asyncio
 import json
+import time
 
 from applications.config import Config
 from applications.log import logging
-from applications.spider import searchVideos
+from applications.spider import search_videos_from_web
 
 
 class spiderTask(object):
     """
     定时执行任务
     """
-    C = Config()
 
     def __init__(self, mysql_client):
         """
         :param mysql_client:
         """
         self.mysql_client = mysql_client
-        self.article_video = self.C.articleVideos
-        self.article_text = self.C.articleText
-        self.article_video_crawler = self.C.articleCrawlerVideos
-        self.spider_coroutines = self.C.getConfigValue("spiderCoroutines")
-        self.gh_id_map = json.loads(self.C.getConfigValue("accountMap"))
+        self.config = Config()
+        self.article_match_video_table = self.config.article_match_video_table
+        self.article_text_table = self.config.article_text_table
+        self.article_crawler_video_table = self.config.article_crawler_video_table
+        self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
+        self.gh_id_map = json.loads(self.config.get_config_value("accountMap"))
 
-    async def getTask(self):
+    async def get_task(self):
         """
-        获取任务
+        获取任务, 查询出 article_match_video_table 中 已经 kimi  执行完成的  content_id
         :return:
         """
         select_sql = f"""
-            SELECT trace_id, content_id, gh_id, process_times
-            FROM {self.article_video}
+            SELECT 
+                amvt.trace_id, 
+                amvt.content_id, 
+                amvt.gh_id, 
+                amvt.process_times
+            FROM {self.article_match_video_table} amvt
+            JOIN (
+                select content_id
+                from {self.article_text_table}
+                where kimi_status != 0
+            ) att on amvt.content_id = att.content_id
             WHERE content_status = 0 and process_times <= 3
             GROUP BY content_id
             LIMIT {self.spider_coroutines};
         """
-        content_id_tuple = await self.mysql_client.asyncSelect(select_sql)
+        content_id_tuple = await self.mysql_client.async_select(select_sql)
         if content_id_tuple:
             content_id_list = [i for i in list(content_id_tuple)]
             task_obj_list = [
@@ -58,7 +68,7 @@ class spiderTask(object):
         else:
             return []
 
-    async def getHistoryVideos(self, content_id):
+    async def get_history_videos(self, content_id):
         """
         check whether the contents videos exists
         :param content_id:
@@ -66,58 +76,58 @@ class spiderTask(object):
         """
         select_sql = f"""
             SELECT count(1)
-            FROM {self.article_video_crawler}
+            FROM {self.article_crawler_video_table}
             where content_id = '{content_id}' and download_status = 2;
         """
-        content_videos = await self.mysql_client.asyncSelect(select_sql)
+        content_videos = await self.mysql_client.async_select(select_sql)
         videos_count = content_videos[0][0]
         if videos_count >= 3:
             return True
         else:
             return False
 
-    async def judgeContentProcessing(self, content_id):
+    async def judge_content_processing(self, content_id):
         """
-        判断该content_id是否在处理中
+        判断该 content_id 是否在处理中
         :param content_id:
         :return:
         """
         select_sql = f"""
                        SELECT trace_id, content_status
-                       FROM {self.article_video}
+                       FROM {self.article_match_video_table}
                        WHERE content_id = '{content_id}'
                        ORDER BY id DESC;
                    """
-        result = await self.mysql_client.asyncSelect(select_sql)
+        result = await self.mysql_client.async_select(select_sql)
         if result:
             for item in result:
                 trace_id, content_status = item
-                if content_status == 1:
+                if content_status != 0:
                     return False
             return True
         else:
             return True
 
-    async def getKimiResult(self, content_id):
+    async def get_kimi_result(self, content_id):
         """
         通过 content_id 获取kimi info
         :return:
         """
         select_sql = f"""
         select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status
-        from {self.article_text}
+        from {self.article_text_table}
         where content_id = '{content_id}';
         """
-        response = await self.mysql_client.asyncSelect(select_sql)
+        response = await self.mysql_client.async_select(select_sql)
         if response:
             article_detail = response[0]
             if article_detail[4] == 1:
                 result = {
-                    "oriTitle": article_detail[0],
-                    "kimiTitle": article_detail[1],
-                    "kimiSummary": article_detail[2],
-                    "kimiKeys": json.loads(article_detail[3]),
-                    "kimiStatus": article_detail[4]
+                    "ori_title": article_detail[0],
+                    "kimi_title": article_detail[1],
+                    "kimi_summary": article_detail[2],
+                    "kimi_keys": json.loads(article_detail[3]),
+                    "kimi_status": article_detail[4]
                 }
             else:
                 result = {
@@ -127,76 +137,108 @@ class spiderTask(object):
         else:
             return
 
-    async def startProcess(self, params):
+    async def start_process(self, params):
         """
         开始处理
         :param params:
         :return:
         """
-        # 更新文章contentId为1, 说明该文章正在处理中
-        kimi_result = await self.getKimiResult(content_id=params['content_id'])
-        kimi_status = kimi_result['kimiStatus']
+        defeat_status = 99
+        finish_kimi_status = 1
+        finish_spider_status = 2
+        kimi_result = await self.get_kimi_result(content_id=params['content_id'])
+        kimi_status = kimi_result['kimi_status']
         match kimi_status:
             case 1:
                 update_process_times_sql = f"""
-                            UPDATE {self.article_video}
-                            SET process_times = %s, content_status = %s
+                            UPDATE {self.article_match_video_table}
+                            SET process_times = %s, content_status = %s, content_status_update_time = %s
                             WHERE trace_id = %s;
                             """
-                await self.mysql_client.asyncInsert(
+                await self.mysql_client.async_insert(
                     sql=update_process_times_sql,
                     params=(
                         params['process_times'] + 1,
-                        1,
+                        finish_kimi_status,
+                        int(time.time()),
                         params['trace_id']
                     )
                 )
                 try:
-                    await searchVideos(
+                    search_videos_count = await search_videos_from_web(
                         info={
-                            "oriTitle": kimi_result['oriTitle'],
-                            "kimiSummary": kimi_result['kimiSummary'],
-                            "kimiKeys": kimi_result['kimiKeys'],
-                            "traceId": params['trace_id'],
-                            "ghId": params['gh_id'],
-                            "contentId": params['content_id'],
-                            "spider": self.article_video_crawler
+                            "ori_title": kimi_result['ori_title'],
+                            "kimi_summary": kimi_result['kimi_summary'],
+                            "kimi_keys": kimi_result['kimi_keys'],
+                            "trace_id": params['trace_id'],
+                            "gh_id": params['gh_id'],
+                            "content_id": params['content_id'],
+                            "crawler_video_table": self.article_crawler_video_table
                         },
-                        ghIdMap=self.gh_id_map,
-                        dbClient=self.mysql_client
+                        gh_id_map=self.gh_id_map,
+                        db_client=self.mysql_client
                     )
+                    if search_videos_count > 3:
+                        update_process_times_sql = f"""
+                                                    UPDATE {self.article_match_video_table}
+                                                    SET process_times = %s, content_status = %s, content_status_update_time = %s
+                                                    WHERE trace_id = %s;
+                                                    """
+                        await self.mysql_client.async_insert(
+                            sql=update_process_times_sql,
+                            params=(
+                                params['process_times'] + 1,
+                                finish_spider_status,
+                                int(time.time()),
+                                params['trace_id']
+                            )
+                        )
+                    else:
+                        roll_back_status = f"""
+                                                UPDATE {self.article_match_video_table}
+                                                SET process_times = %s, content_status_update_time = %s
+                                                WHERE trace_id = %s;
+                                            """
+                        await self.mysql_client.async_insert(
+                            sql=roll_back_status,
+                            params=(
+                                params['process_times'] + 1,
+                                int(time.time()),
+                                params['trace_id']
+                            )
+                        )
                 except Exception as e:
                     roll_back_status = f"""
-                    UPDATE {self.article_video}
-                    SET content_status = %s
-                    WHERE trace_id = %s;
+                                    UPDATE {self.article_match_video_table}
+                                    SET process_times = %s, content_status_update_time = %s
+                                    WHERE trace_id = %s;
                     """
-                    await self.mysql_client.asyncInsert(
+                    await self.mysql_client.async_insert(
                         sql=roll_back_status,
                         params=(
-                            0,
+                            params['process_times'] + 1,
+                            int(time.time()),
                             params['trace_id']
                         )
                     )
-                    print("处理失败,回退状态为 0")
+                    print("爬虫处理失败: {}".format(e))
             case 2:
                 update_process_times_sql = f"""
-                            UPDATE {self.article_video}
-                            SET process_times = %s, content_status = %s
+                            UPDATE {self.article_match_video_table}
+                            SET process_times = %s, content_status = %s, content_status_update_time = %s
                             WHERE trace_id = %s;
                             """
-                await self.mysql_client.asyncInsert(
+                await self.mysql_client.async_insert(
                     sql=update_process_times_sql,
                     params=(
                         params['process_times'] + 1,
-                        3,
-                        params['trace_id']
+                        defeat_status,
+                        int(time.time()),
+                        params['trace_id'],
                     )
                 )
-            case 0:
-                print("kimi not ready")
 
-    async def processTask(self, params):
+    async def process_task(self, params):
         """
         异步执行
         :param params:
@@ -204,7 +246,7 @@ class spiderTask(object):
         """
         content_id = params['content_id']
         trace_id = params['trace_id']
-        video_id_list = await self.getHistoryVideos(content_id=content_id)
+        video_id_list = await self.get_history_videos(content_id=content_id)
         if video_id_list:
             # 说明已经存在了结果, 将该条记录下的video_id拿出来
             logging(
@@ -213,14 +255,14 @@ class spiderTask(object):
                 trace_id=trace_id
             )
         else:
-            flag = await self.judgeContentProcessing(content_id)
+            flag = await self.judge_content_processing(content_id)
             if flag:
                 logging(
                     code="9004",
                     info="无正在处理的文章ID, 开始处理",
                     trace_id=trace_id
                 )
-                await self.startProcess(params=params)
+                await self.start_process(params=params)
             else:
                 logging(
                     code="9003",
@@ -233,14 +275,14 @@ class spiderTask(object):
         处理
         :return:
         """
-        task_list = await self.getTask()
+        task_list = await self.get_task()
         logging(
             code="5005",
             info="Spider Task Got {} this time".format(len(task_list)),
             function="Spider Task"
         )
         if task_list:
-            tasks = [self.processTask(params) for params in task_list]
+            tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
             print("没有新的爬虫请求")