Переглянути джерело

2024-09-25
发布新版本文章匹配小程序服务

罗俊辉 7 місяців тому
батько
коміт
5cd3bbe92c

+ 8 - 5
app.py

@@ -2,13 +2,16 @@
 @author: luojunhui
 """
 from quart import Quart
-from applications.routes import Routes
-from applications.functions.async_mysql import AsyncMySQLClient
+from server.routes import Routes
+from applications.db import AsyncMySQLClient
+from applications.config import Config
 
 # 初始化 App
 app = Quart(__name__, static_folder='applications/static')
+config = Config()
+# 注册连接池
 AsyncMySQL = AsyncMySQLClient(app)
-app_routes = Routes(AsyncMySQL)
+app_routes = Routes(AsyncMySQL, config)
 app.register_blueprint(app_routes)
 
 
@@ -18,7 +21,7 @@ async def init_db():
     初始化
     :return:
     """
-    await AsyncMySQL.init_pool()
+    await AsyncMySQL.initPool()
 
 
 @app.after_serving
@@ -27,7 +30,7 @@ async def close_db():
     关闭连接
     :return:
     """
-    await AsyncMySQL.close_pool()
+    await AsyncMySQL.closePool()
 
 
 if __name__ == '__main__':

+ 57 - 0
applications/config/__init__.py

@@ -0,0 +1,57 @@
+"""
+@author: luojunhui
+"""
+import json
+
+import pyapollos
+
+
+class Config(object):
+    """
+    apolloConfig
+    """
+
+    def __init__(self, env="pre"):
+        """
+        :param env:
+        """
+        match env:
+            case "prod":
+                self.apolloConnection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://apolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.articleVideos = "long_articles_match_videos"
+                self.articleText = "long_articles_text"
+                self.articleCrawlerVideos = "long_articles_crawler_videos"
+                self.rootSourceIdTable = "long_articles_root_source_id"
+            case "dev":
+                self.apolloConnection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.articleVideos = "long_articles_match_videos_dev"
+                self.articleText = "long_articles_text_dev"
+                self.articleCrawlerVideos = "long_articles_crawler_videos_dev"
+                self.rootSourceIdTable = "long_articles_root_source_id_dev"
+            case "pre":
+                self.articleVideos = "long_articles_match_videos"
+                self.articleText = "long_articles_text"
+                self.articleCrawlerVideos = "long_articles_crawler_videos"
+                self.rootSourceIdTable = "long_articles_root_source_id"
+                self.apolloConnection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+
+    def getConfigValue(self, key):
+        """
+        通过 key 获取配置的 Config
+        :param key:
+        :return:
+        """
+        response = self.apolloConnection.get_value(key)
+        return response

+ 137 - 0
applications/db/__init__.py

@@ -0,0 +1,137 @@
+"""
+@author: luojunhui
+self.app.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+"""
+import aiomysql
+
+
+class AsyncMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self, app):
+        self.app = app
+
+    async def initPool(self):
+        """
+        初始化连接
+        host='',
+        port=3306,
+        user='changwen_admin',
+        password='changwen@123456',
+        db='long_articles',
+        charset='utf8mb4'
+        :return:
+        """
+        self.app.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='changwen_admin',
+            password='changwen@123456',
+            db='long_articles',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def closePool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.app.mysql_pool.close()
+        await self.app.mysql_pool.wait_closed()
+
+    async def asyncSelect(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.app.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def asyncInsert(self, sql, params):
+        """
+        insert and update method
+        :param params:
+        :param sql:
+        :return:
+        """
+        async with self.app.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                try:
+                    await cursor.execute(sql, params)
+                    await coon.commit()
+                except Exception as e:
+                    await coon.rollback()
+                    raise
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='changwen_admin',
+            password='changwen@123456',
+            db='long_articles',
+            charset='utf8mb4',
+            connect_timeout=120
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def asyncSelect(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def asyncInsert(self, sql, params):
+        """
+        insert and update method
+        :param params:
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql, params)
+                await coon.commit()

+ 0 - 2
applications/deal/getOffVideos.py

@@ -3,8 +3,6 @@
 """
 import time
 
-from static.config import db_article
-
 
 class GetOffVideos(object):
     """

+ 0 - 61
applications/functions/async_mysql.py

@@ -1,61 +0,0 @@
-"""
-@author: luojunhui
-"""
-import aiomysql
-
-
-class AsyncMySQLClient(object):
-    """
-    Async MySQL
-    """
-
-    def __init__(self, app):
-        self.app = app
-
-    async def init_pool(self):
-        """
-        初始化连接
-        :return:
-        """
-        self.app.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='crawler',
-            password='crawler123456@',
-            db='piaoquan-crawler',
-            charset='utf8mb4',
-            connect_timeout=120,
-        )
-        print("mysql init successfully")
-
-    async def close_pool(self):
-        """
-        关闭 mysql 连接
-        :return:
-        """
-        self.app.mysql_pool.close()
-        await self.app.mysql_pool.wait_closed()
-
-    async def async_select(self, sql):
-        """
-        select method
-        :param sql:
-        :return:
-        """
-        async with self.app.mysql_pool.acquire() as conn:
-            async with conn.cursor() as cursor:
-                await cursor.execute(sql)
-                result = await cursor.fetchall()
-                return result
-
-    async def async_insert(self, sql, params):
-        """
-        insert and update method
-        :param params:
-        :param sql:
-        :return:
-        """
-        async with self.app.mysql_pool.acquire() as coon:
-            async with coon.cursor() as cursor:
-                await cursor.execute(sql, params)
-                await coon.commit()

+ 3 - 3
applications/functions/kimi.py

@@ -21,9 +21,9 @@ class KimiServer(object):
         :param params:
         :return:
         """
-        title = params['title'].split("@@")[-1]
-        contents = params['content']
-        trace_id = params['trace_id']
+        title = params['articleTitle'].split("@@")[-1]
+        contents = params['articleText']
+        trace_id = params['contentId']
         try:
             kimi_title = await cls.kimi_title(title)
         except Exception as e:

+ 1 - 6
applications/functions/pqFunctions.py

@@ -43,13 +43,8 @@ async def publishToPQ(video_obj):
         "appType": 888880,
         "repeatStatus": 1
     }
-    # response = requests.post(
-    #     url=url,
-    #     headers=headers,
-    #     data=payload,
-    # )
-    # return response.json()
     response = await async_post(url, headers, payload)
+    print(json.dumps(response, ensure_ascii=False, indent=4))
     return response
 
 

+ 13 - 13
applications/functions/video_item.py

@@ -74,7 +74,7 @@ class VideoItem(object):
         must_keys = [
             "video_id",
             "user_id",
-            "user_name",
+            # "user_name",
             "out_video_id",
             "session",
             "video_url",
@@ -126,8 +126,8 @@ class VideoProducer(object):
         platform = "weixin_search"
         publish_time_stamp = int(video_obj['pubTime'])
         item = VideoItem()
-        item.add_video_info("user_id", user["uid"])
-        item.add_video_info("user_name", user["nick_name"])
+        item.add_video_info("user_id", user)
+        # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['hashDocID'])
         item.add_video_info("video_title", trace_id)
         item.add_video_info("publish_time_stamp", int(publish_time_stamp))
@@ -153,10 +153,10 @@ class VideoProducer(object):
         platform = "baidu_search"
         publish_time_stamp = int(video_obj['publish_time'])
         item = VideoItem()
-        print("baidu")
-        print(json.dumps(video_obj, ensure_ascii=False, indent=4))
-        item.add_video_info("user_id", user["uid"])
-        item.add_video_info("user_name", user["nick_name"])
+        # print("baidu")
+        # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
+        item.add_video_info("user_id", user)
+        # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['id'])
         item.add_video_info("video_title", video_obj['title'])
         item.add_video_info("publish_time_stamp", publish_time_stamp)
@@ -185,8 +185,8 @@ class VideoProducer(object):
         platform = "xg_search"
         publish_time_stamp = int(video_obj['publish_time'])
         item = VideoItem()
-        item.add_video_info("user_id", user["uid"])
-        item.add_video_info("user_name", user["nick_name"])
+        item.add_video_info("user_id", user)
+        # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['video_id'])
         item.add_video_info("video_title", video_obj.get('video_title'))
         item.add_video_info("publish_time_stamp", int(publish_time_stamp))
@@ -214,10 +214,10 @@ class VideoProducer(object):
         platform = "dy_search"
         publish_time_stamp = int(video_obj['publish_timestamp'] / 1000)
         item = VideoItem()
-        print("douyin")
-        print(json.dumps(video_obj, ensure_ascii=False, indent=4))
-        item.add_video_info("user_id", user["uid"])
-        item.add_video_info("user_name", user["nick_name"])
+        # print("douyin")
+        # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
+        item.add_video_info("user_id", user)
+        # item.add_video_info("user_name", user["nick_name"])
         item.add_video_info("video_id", video_obj['channel_content_id'])
         item.add_video_info("video_title", video_obj['title'])
         item.add_video_info("publish_time_stamp", int(publish_time_stamp))

+ 116 - 0
applications/spider/__init__.py

@@ -0,0 +1,116 @@
+"""
+@author: luojunhui
+"""
+from datetime import datetime
+
+from applications.functions.video_item import VideoProducer
+from applications.functions.log import logging
+from applications.match_algorithm.rank import title_similarity_rank
+from .spiderAB import SearchABTest
+from .spiderSchedule import SearchMethod
+
+
+async def videoSender(video_obj, user, trace_id, platform, content_id, table, dbClient):
+    """
+    异步处理微信 video_obj
+    公众号和站内账号一一对应
+    :param dbClient:
+    :param table:
+    :param content_id:
+    :param platform:
+    :param user:
+    :param trace_id:
+    :param video_obj:
+    :return:
+    """
+    Video = VideoProducer()
+    if platform == "xg_search":
+        mq_obj = Video.xg_video_producer(
+            video_obj=video_obj,
+            user=user,
+            trace_id=trace_id,
+        )
+    elif platform == "baidu_search":
+        mq_obj = Video.baidu_video_producer(
+            video_obj=video_obj,
+            user=user,
+            trace_id=trace_id,
+        )
+    elif platform == "wx_search":
+        mq_obj = Video.wx_video_producer(
+            video_obj=video_obj,
+            user=user,
+            trace_id=trace_id,
+        )
+    elif platform == "dy_search":
+        mq_obj = Video.dy_video_producer(
+            video_obj=video_obj,
+            user=user,
+            trace_id=trace_id,
+        )
+    else:
+        mq_obj = {}
+    mq_obj['trace_id'] = trace_id
+    mq_obj['content_id'] = content_id
+    insert_sql = f"""
+    INSERT INTO {table}
+    (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id)
+    values 
+    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+    """
+    await dbClient.asyncInsert(
+        sql=insert_sql,
+        params=(
+            content_id,
+            mq_obj['video_id'],
+            platform,
+            mq_obj['video_title'],
+            mq_obj['play_cnt'],
+            mq_obj['like_cnt'],
+            datetime.fromtimestamp(mq_obj['publish_time_stamp']).strftime('%Y-%m-%d %H:%M:%S'),
+            datetime.now().__str__(),
+            mq_obj['duration'],
+            mq_obj['video_url'],
+            mq_obj['cover_url'],
+            mq_obj['user_id'],
+            trace_id
+        )
+    )
+
+
+async def searchVideos(info, ghIdMap, dbClient):
+    """
+    search and send msg to ETL
+    :param dbClient:
+    :param ghIdMap:
+    :param info:
+    :return:
+    """
+    SearchAB = SearchABTest(info=info, searchMethod=SearchMethod())
+    # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
+    trace_id = info['traceId']
+    gh_id = info['ghId']
+    content_id = info['contentId']
+    recall_list = await SearchAB.ab_5()
+    logging(
+        code="1006",
+        info="搜索到{}条视频".format(len(recall_list)),
+        data=recall_list,
+        trace_id=info['traceId']
+    )
+    # 按照标题相似度排序
+    ranked_list = title_similarity_rank(content_title=info['oriTitle'].split("@@")[-1], recall_list=recall_list)
+    for recall_obj in ranked_list:
+        if recall_obj:
+            platform = recall_obj['platform']
+            recall_video = recall_obj['result']
+            if recall_video:
+                await videoSender(
+                    video_obj=recall_video,
+                    user=ghIdMap.get(gh_id, 69637498),
+                    trace_id=trace_id,
+                    platform=platform,
+                    content_id=content_id,
+                    table=info['spider'],
+                    dbClient=dbClient
+                )

+ 169 - 0
applications/spider/spiderAB.py

@@ -0,0 +1,169 @@
+"""
+@author: luojunhui
+"""
+
+
+class SearchABTest(object):
+    """
+    搜索策略实验方案
+    """
+
+    ori_title = None
+    article_summary = None
+    article_keys = None
+    trace_id = None
+
+    def __init__(self, info, searchMethod):
+        SearchABTest.set_class_properties(info, searchMethod)
+
+    @classmethod
+    def set_class_properties(cls, info, searchMethod):
+        """
+        初始化搜索策略实验类
+        :param searchMethod:
+        :param info: kimi 挖掘的基本信息
+        :return:
+        """
+        cls.ori_title = info["oriTitle"]
+        cls.article_summary = info["kimiSummary"]
+        cls.article_keys = info["kimiKeys"]
+        cls.trace_id = info["traceId"]
+        cls.searchMethod = searchMethod
+
+    @classmethod
+    async def base_line(cls):
+        """
+        兜底策略
+        """
+        result = await cls.searchMethod.search_v1(
+            text=cls.article_keys[0],
+            trace_id=cls.trace_id
+        )
+        if result:
+            return result
+        else:
+            sub_result = await cls.searchMethod.search_v1(
+                text=cls.article_keys[1],
+                trace_id=cls.trace_id)
+            if sub_result:
+                return sub_result
+            else:
+                return await cls.searchMethod.search_v1(
+                    text=cls.article_keys[2],
+                    trace_id=cls.trace_id
+                )
+
+    @classmethod
+    async def ab_0(cls):
+        """
+        默认原标题搜索
+        :return:
+        """
+        search_result = await cls.searchMethod.search_v1(
+            text=cls.ori_title,
+            trace_id=cls.trace_id
+        )
+        if search_result:
+            return search_result
+        else:
+            return await cls.base_line()
+
+    @classmethod
+    async def ab_1(cls):
+        """
+        使用 content_summary搜索
+        :return:
+        """
+        search_result = await cls.searchMethod.search_v1(
+            text=cls.article_summary,
+            trace_id=cls.trace_id
+        )
+        if search_result:
+            return search_result
+        else:
+            return await cls.ab_0()
+
+    @classmethod
+    async def ab_2(cls):
+        """
+        使用文本关键词搜索
+        :return:
+        """
+        search_result = await cls.searchMethod.search_v1(
+            text=cls.article_keys[0],
+            trace_id=cls.trace_id
+        )
+        if search_result:
+            return search_result
+        else:
+            return await cls.base_line()
+
+    @classmethod
+    async def ab_3(cls):
+        """
+        使用文本关键词搜索
+        :return:
+        """
+        search_result = await cls.searchMethod.search_v1(
+            text=cls.article_keys[1],
+            trace_id=cls.trace_id
+        )
+        if search_result:
+            return search_result
+        else:
+            return await cls.base_line()
+
+    @classmethod
+    async def ab_4(cls):
+        """
+        使用文本关键词搜索
+        :return:
+        """
+        search_result = await cls.searchMethod.search_v1(
+            text=cls.article_keys[2],
+            trace_id=cls.trace_id
+        )
+        if search_result:
+            return search_result
+        else:
+            return await cls.base_line()
+
+    @classmethod
+    async def ab_5(cls):
+        """
+        增量搜索, 返回result_list
+        :return:
+        """
+        result_list = await cls.searchMethod.search_v2(
+            text=cls.article_summary[:15],
+            trace_id=cls.trace_id
+        )
+        if len(result_list) > 3:
+            return result_list
+        else:
+            result_list += await cls.searchMethod.search_v2(
+                text=cls.ori_title[:15],
+                trace_id=cls.trace_id
+            )
+            if len(result_list) > 3:
+                return result_list
+            else:
+                result_list += await cls.searchMethod.search_v2(
+                    text=cls.article_keys[0],
+                    trace_id=cls.trace_id
+                )
+                if len(result_list) > 3:
+                    return result_list
+                else:
+                    result_list += await cls.searchMethod.search_v2(
+                        text=cls.article_keys[1],
+                        trace_id=cls.trace_id
+                    )
+                    if result_list:
+                        return result_list
+                    else:
+                        result_list += await cls.searchMethod.search_v2(
+                            text=cls.article_keys[2],
+                            trace_id=cls.trace_id
+                        )
+                        return result_list

+ 58 - 0
applications/spider/spiderSchedule.py

@@ -0,0 +1,58 @@
+"""
+@author: luojunhui
+"""
+import time
+from applications.search import *
+
+
+class SearchMethod(object):
+    """
+    搜索召回模式
+    """
+    s_words = []
+
+    @classmethod
+    async def search_v1(cls, text, trace_id):
+        """
+        dy ---> baidu ---> xigua
+        :param text:
+        :param trace_id:
+        :return:
+        """
+        douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
+        if douyin_result:
+            return {"platform": "dy_search", "result": douyin_result[0]}
+        else:
+            time.sleep(1)
+            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
+            if baidu_result:
+                return {"platform": "baidu_search", "result": baidu_result[0]}
+            else:
+                xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
+                if xigua_result:
+                    return {"platform": "xg_search", "result": xigua_result[0]}
+                else:
+                    return None
+
+    @classmethod
+    async def search_v2(cls, text, trace_id):
+        """
+        dy ---> baidu ---> xigua
+        :param trace_id:
+        :param text:
+        :return:
+        """
+        L = []
+        douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
+        for vid_obj in douyin_result:
+            L.append({"platform": "dy_search", "result": vid_obj})
+        if len(L) >= 3:
+            return L
+        else:
+            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
+            if baidu_result:
+                L.append({"platform": "baidu_search", "result": baidu_result[0]})
+            xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
+            if xigua_result:
+                L.append({"platform": "xg_search", "result": xigua_result[0]})
+            return L

+ 27 - 0
etlTask.py

@@ -0,0 +1,27 @@
+"""
+@author: luojunhui
+"""
+import time
+import asyncio
+import datetime
+from tasks.etl_task import AsyncETL
+from applications.db import TaskMySQLClient
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = AsyncETL(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待10s".format(now_str))
+        time.sleep(10)

+ 27 - 0
historyTask.py

@@ -0,0 +1,27 @@
+"""
+@author: luojunhui
+"""
+import time
+import asyncio
+import datetime
+from tasks.history_task import historyContentIdTask
+from applications.db import TaskMySQLClient
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = historyContentIdTask(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待10s".format(now_str))
+        time.sleep(10)

+ 24 - 0
kimiTask.py

@@ -0,0 +1,24 @@
+import time
+import asyncio
+import datetime
+from tasks.kimi_task import KimiTask
+from applications.db import TaskMySQLClient
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = KimiTask(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待10s".format(now_str))
+        time.sleep(10)

+ 0 - 86
matchVideoFromHistoryArticleASC.py

@@ -1,86 +0,0 @@
-"""
-@author: luojunhui
-"""
-import datetime
-import time
-
-import aiomysql
-import asyncio
-
-from tasks import MatchTask3
-
-
-class TaskMySQLClient(object):
-    """
-    Async MySQL
-    """
-
-    def __init__(self):
-        self.mysql_pool = None
-
-    async def init_pool(self):
-        """
-        初始化连接
-        :return:
-        """
-        self.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='crawler',
-            password='crawler123456@',
-            db='piaoquan-crawler',
-            charset='utf8mb4',
-            connect_timeout=120,
-        )
-        print("mysql init successfully")
-
-    async def close_pool(self):
-        """
-        关闭 mysql 连接
-        :return:
-        """
-        self.mysql_pool.close()
-        await self.mysql_pool.wait_closed()
-
-    async def async_select(self, sql):
-        """
-        select method
-        :param sql:
-        :return:
-        """
-        async with self.mysql_pool.acquire() as conn:
-            async with conn.cursor() as cursor:
-                await cursor.execute(sql)
-                result = await cursor.fetchall()
-                return result
-
-    async def async_insert(self, sql, params):
-        """
-        insert and update method
-        :param params:
-        :param sql:
-        :return:
-        """
-        async with self.mysql_pool.acquire() as coon:
-            async with coon.cursor() as cursor:
-                await cursor.execute(sql, params)
-                await coon.commit()
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = MatchTask3(TMC)
-    await PD.deal()
-    now_str = datetime.datetime.now().__str__()
-    print("{}    请求执行完成, 等待1分钟".format(now_str))
-    await asyncio.sleep(1 * 60)
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())

+ 0 - 84
matchVideoFromHistoryArticleDESC.py

@@ -1,84 +0,0 @@
-"""
-@author: luojunhui
-"""
-import datetime
-import aiomysql
-import asyncio
-
-from tasks import MatchTask4
-
-
-class TaskMySQLClient(object):
-    """
-    Async MySQL
-    """
-
-    def __init__(self):
-        self.mysql_pool = None
-
-    async def init_pool(self):
-        """
-        初始化连接
-        :return:
-        """
-        self.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='crawler',
-            password='crawler123456@',
-            db='piaoquan-crawler',
-            charset='utf8mb4',
-            connect_timeout=120,
-        )
-        print("mysql init successfully")
-
-    async def close_pool(self):
-        """
-        关闭 mysql 连接
-        :return:
-        """
-        self.mysql_pool.close()
-        await self.mysql_pool.wait_closed()
-
-    async def async_select(self, sql):
-        """
-        select method
-        :param sql:
-        :return:
-        """
-        async with self.mysql_pool.acquire() as conn:
-            async with conn.cursor() as cursor:
-                await cursor.execute(sql)
-                result = await cursor.fetchall()
-                return result
-
-    async def async_insert(self, sql, params):
-        """
-        insert and update method
-        :param params:
-        :param sql:
-        :return:
-        """
-        async with self.mysql_pool.acquire() as coon:
-            async with coon.cursor() as cursor:
-                await cursor.execute(sql, params)
-                await coon.commit()
-
-
-async def main():
-    """
-    main2
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = MatchTask4(TMC)
-    await PD.deal()
-    now_str = datetime.datetime.now().__str__()
-    print("{}    请求执行完成, 等待10s".format(now_str))
-    await asyncio.sleep(10)
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())

+ 0 - 86
match_video_task.py

@@ -1,86 +0,0 @@
-"""
-@author: luojunhui
-"""
-import time
-import datetime
-import asyncio
-
-import aiomysql
-from tasks import MatchTask1
-
-
-class TaskMySQLClient(object):
-    """
-    Async MySQL
-    """
-
-    def __init__(self):
-        self.mysql_pool = None
-
-    async def init_pool(self):
-        """
-        初始化连接
-        :return:
-        """
-        self.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='crawler',
-            password='crawler123456@',
-            db='piaoquan-crawler',
-            charset='utf8mb4',
-            connect_timeout=120,
-        )
-        print("mysql init successfully")
-
-    async def close_pool(self):
-        """
-        关闭 mysql 连接
-        :return:
-        """
-        self.mysql_pool.close()
-        await self.mysql_pool.wait_closed()
-
-    async def async_select(self, sql):
-        """
-        select method
-        :param sql:
-        :return:
-        """
-        async with self.mysql_pool.acquire() as conn:
-            async with conn.cursor() as cursor:
-                await cursor.execute(sql)
-                result = await cursor.fetchall()
-                return result
-
-    async def async_insert(self, sql, params):
-        """
-        insert and update method
-        :param params:
-        :param sql:
-        :return:
-        """
-        async with self.mysql_pool.acquire() as coon:
-            async with coon.cursor() as cursor:
-                await cursor.execute(sql, params)
-                await coon.commit()
-
-
-async def main():
-    """
-    main job
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = MatchTask1(TMC)
-    await PD.deal()
-
-
-if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待10s".format(now_str))
-        time.sleep(10)
-

+ 27 - 0
publishtask.py

@@ -0,0 +1,27 @@
+"""
+@author: luojunhui
+"""
+import time
+import asyncio
+import datetime
+from tasks.publish_task import publishTask
+from applications.db import TaskMySQLClient
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = publishTask(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待10s".format(now_str))
+        time.sleep(10)

+ 10 - 106
requirements.txt

@@ -1,107 +1,11 @@
-aiofiles==23.2.1
-aiohttp==3.9.3
-aiosignal==1.3.1
-alembic==1.11.1
-aliyun-log-python-sdk==0.9.1
-aliyun-python-sdk-core==2.15.1
-aliyun-python-sdk-kms==2.16.2
-annotated-types==0.6.0
-anyio==4.3.0
-APScheduler==3.10.1
-async-timeout==4.0.3
-attrs==23.2.0
-backports.zoneinfo==0.2.1
-black==24.3.0
-blinker==1.6.2
-cachelib==0.12.0
-certifi==2024.2.2
-cffi==1.15.1
-charset-normalizer==3.3.2
-click==8.1.6
-colorama==0.4.6
-crcmod==1.7
-cryptography==41.0.2
-dateparser==1.2.0
-decorator==5.1.1
-distro==1.9.0
-elastic-transport==8.13.0
-elasticsearch==8.13.0
-exceptiongroup==1.2.0
-Flask==3.0.3
-Flask-APScheduler==1.12.4
-Flask-Login==0.6.2
-Flask-Mail==0.9.1
-flask-marshmallow==0.15.0
-Flask-Migrate==4.0.4
-Flask-Reuploaded==1.3.0
-Flask-Session==0.5.0
-Flask-SQLAlchemy==3.1.1
-Flask-WTF==1.2.1
-frozenlist==1.4.1
-greenlet==2.0.2
-h11==0.14.0
-h2==4.1.0
-hpack==4.0.0
-httpcore==1.0.4
-httpx==0.27.0
-Hypercorn==0.16.0
-hyperframe==6.0.1
-idna==3.6
-importlib-metadata==6.8.0
-importlib_resources==6.1.2
-itsdangerous==2.1.2
-Jinja2==3.1.2
-jmespath==0.10.0
-Mako==1.2.4
-MarkupSafe==2.1.3
-marshmallow==3.20.1
-marshmallow-sqlalchemy==0.29.0
-mq-http-sdk==1.0.3
-multidict==6.0.5
-mypy-extensions==1.0.0
-numpy==1.24.4
-odps==3.5.1
-openai==1.21.2
-oss2==2.18.4
-packaging==23.1
-pandas==2.0.3
-pathspec==0.12.1
-Pillow==10.0.0
-platformdirs==4.2.0
-priority==2.0.0
-protobuf==3.20.3
-psutil==5.9.5
-pyarrow==15.0.2
-pycparser==2.21
-pycryptodome==3.20.0
-pydantic==2.6.4
-pydantic_core==2.16.3
-PyMySQL==1.1.0
-pyodps==0.11.6
-python-dateutil==2.8.2
-pytz==2023.3
-Quart==0.19.5
-regex==2024.4.16
-requests==2.31.0
-schedule==1.2.1
-six==1.16.0
-sniffio==1.3.1
-SQLAlchemy==2.0.21
-style==1.1.0
-taskgroup==0.0.0a4
-tomli==2.0.1
-tqdm==4.66.2
-typing_extensions==4.7.1
-tzdata==2023.3
-tzlocal==5.0.1
-update==0.0.1
-urllib3==2.2.1
-validators==0.20.0
-Werkzeug==3.0.2
-wsproto==1.2.0
-WTForms==3.1.2
-yarl==1.9.4
-zipp==3.16.2
-
-lxml~=5.2.1
+aiofiles
+aiohttp~=3.10.4
+aliyun-log-python-sdk
+aliyun-python-sdk-core
+aliyun-python-sdk-kms
+quart~=0.19.6
+requests~=2.32.3
+tqdm~=4.66.5
+pymysql~=1.1.1
+pyapollos~=0.1.5
 aiomysql~=0.2.0

+ 7 - 0
server/api/__init__.py

@@ -0,0 +1,7 @@
+"""
+@author: luojunhui
+"""
+from .get_off_videos import GetOffVideos
+from .minigram import Minigram
+from .response import Response
+from .record import Record

+ 78 - 0
server/api/get_off_videos.py

@@ -0,0 +1,78 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+
+
+class GetOffVideos(object):
+    """
+    下架视频
+    """
+
+    def __init__(self, params, mysql_client, config):
+        self.params = params
+        self.mysql_client = mysql_client
+        self.articles_video = config.articleVideos
+        self.trace_id = None
+
+    def checkParams(self):
+        """
+
+        :return:
+        """
+        try:
+            self.trace_id = self.params['traceId']
+            return None
+        except Exception as e:
+            response = {
+                "error": "params error",
+                "info": str(e),
+                "data": self.params
+            }
+            return response
+
+    async def pushVideoIntoQueue(self):
+        """
+        将视频id记录到待下架表中
+        :return:
+        """
+        select_sql = f"""
+        select response from {self.articles_video} where trace_id = '{self.trace_id}';
+        """
+        result = await self.mysql_client.asyncSelect(sql=select_sql)
+        if result:
+            video_list = json.loads(result[0][0])
+            for video in video_list:
+                video_id = video['videoId']
+                try:
+                    update_sql = f"""
+                    INSERT INTO get_off_videos
+                    (video_id, publish_time, video_status, trace_id)
+                    values 
+                    (%s, %s, %s, %s);
+                    """
+                    await self.mysql_client.asyncInsert(
+                        sql=update_sql,
+                        params=(video_id, int(time.time()), 1, self.trace_id)
+                    )
+                except Exception as e:
+                    print(e)
+        else:
+            print("该 trace_id不存在")
+
+    async def deal(self):
+        """
+
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            await self.pushVideoIntoQueue()
+            response = {
+                "status": "success",
+                "traceId": self.trace_id
+            }
+            return response

+ 110 - 0
server/api/minigram.py

@@ -0,0 +1,110 @@
+"""
+@author: luojunhui
+"""
+minigram_map = {
+    1: {
+        # 25: {
+        #     "avatar": "https://rescdn.yishihui.com/0temp/ttmhzfsh.png",
+        #     "id": "wx0b7d95eb293b783b",
+        #     "name": "天天美好祝福生活",
+        #     "index": 25
+        # },
+        25: {
+                "avatar": "https://rescdn.yishihui.com/0temp/pqsp.png",
+                "id": "wxbdd2a2e93d9a6e25",
+                "name": "票圈视频",
+                "index": 25
+            },
+        29: {
+            "avatar": "https://rescdn.yishihui.com/0temp/cyfyld.png",
+            "id": "wx65c76bb4c67934db",
+            "name": "财运福运来到",
+            "index": 29
+        },
+        31: {
+            "avatar": "https://rescdn.yishihui.com/0temp/mhzfshxf2.png",
+            "id": "wx2e4478b1641b3b15",
+            "name": "美好祝福生活幸福",
+            "index": 31
+        }
+    },
+    2: {
+        36: {
+            "avatar": "https://rescdn.yishihui.com/0temp/zfyfyc.jpeg",
+            "id": "wxcddf231abd0dabdc",
+            "name": "祝福有福有财",
+            "index": 36
+        },
+        33: {
+            "avatar": "https://rescdn.yishihui.com/0temp/pqsp.png",
+            "id": "wxbdd2a2e93d9a6e25",
+            "name": "票圈视频",
+            "index": 33
+            }
+    },
+    3: {
+        27: {
+            "avatar": "https://rescdn.yishihui.com/0temp/xymhfqdd.png",
+            "id": "wx7187c217efef24a7",
+            "name": "幸运美好福气多多",
+            "index": 27
+        }
+    }
+}
+
+
+class Minigram(object):
+    """
+    小程序卡片
+    """
+
+    def __init__(self, params):
+        self.params = params
+        self.business_type = None
+        self.mini_code = None
+        self.trace_id = None
+
+    def check_params(self):
+        """
+        校验参数
+        :return:
+        """
+        try:
+            self.business_type = self.params['businessType']
+            self.mini_code = self.params['miniCode']
+            self.trace_id = self.params['traceId']
+            return None
+        except Exception as e:
+            response = {
+                "status": "fail",
+                "code": 1,
+                "message": str(e),
+                "info": "params check error"
+            }
+            return response
+
+    def choose_minigram(self):
+        """
+        分配小程序卡片
+        :return:
+        """
+        try:
+            minigram = minigram_map.get(self.business_type).get(self.mini_code)
+            response = {
+                "programAvatar": minigram['avatar'],
+                "programId": minigram['id'],
+                "programName": minigram['name'],
+                "trace_id": self.trace_id
+            }
+        except Exception as e:
+            response = {
+                "error": "invalid params",
+                "msg": str(e)
+            }
+        return response
+
+    async def deal(self):
+        """
+        :return:
+        """
+        return self.check_params() if self.check_params() else self.choose_minigram()

+ 129 - 0
server/api/record.py

@@ -0,0 +1,129 @@
+import time
+
+from uuid import uuid4
+
+from applications.functions.log import logging
+
+
+class Record(object):
+    """
+    搜索接口处理逻辑
+    """
+
+    def __init__(self, params, mysql_client, config):
+        self.flow_pool_level = None
+        self.content_id = None
+        self.account_name = None
+        self.contents = None
+        self.title = None
+        self.gh_id = None
+        self.params = params
+        self.mysql_client = mysql_client
+        self.article_videos = config.articleVideos
+        self.article_text = config.articleText
+        self.trace_id = "search-{}-{}".format(str(uuid4()), str(int(time.time())))
+
+    def checkParams(self):
+        """
+        检查请求params
+        :return:
+        """
+        try:
+            self.gh_id = self.params['ghId']
+            self.title = self.params['title'].split("@@")[-1].replace("'", "")
+            self.contents = self.params['content'].replace("'", "")
+            self.account_name = self.params['accountName'].replace("'", "")
+            self.content_id = self.params['articleId']
+            self.flow_pool_level = self.params['flowPoolLevelTag']
+            logging(
+                code="1001",
+                info="搜索视频内容接口请求成功, 参数校验成功",
+                port="title_to_search",
+                trace_id=self.trace_id,
+                data=self.params
+            )
+            return None
+        except Exception as e:
+            result = {
+                "status": "fail",
+                "code": 1,
+                "message": str(e),
+                "info": "params check error"
+            }
+            logging(
+                code="4001",
+                info="搜索视频内容接口请求成功, 参数校验失败",
+                port="title_to_search",
+                trace_id=self.trace_id,
+                data=self.params
+            )
+            return result
+
+    async def inputIntoArticleVideos(self):
+        """
+        把数据插入待处理队列
+        :return:
+        """
+        request_time = int(time.time())
+        insert_sql = f"""
+            INSERT INTO {self.article_videos}
+                (trace_id, content_id, flow_pool_level, gh_id, account_name, request_timestamp)
+            VALUES 
+                (%s, %s, %s, %s, %s, %s);
+            """
+        await self.mysql_client.asyncInsert(
+            sql=insert_sql,
+            params=(
+                self.trace_id,
+                self.content_id,
+                self.flow_pool_level,
+                self.gh_id,
+                self.account_name,
+                request_time
+            )
+        )
+        logging(
+            code="1002",
+            info="成功记录请求数据到mysql中",
+            trace_id=self.trace_id
+        )
+
+    async def inputIntoArticleText(self):
+        """
+
+        :return:
+        """
+        insert_sql = f"""
+        INSERT INTO {self.article_text} (content_id, article_title, article_text)
+        values (%s, %s, %s);
+        """
+        try:
+            await self.mysql_client.asyncInsert(
+                sql=insert_sql,
+                params=(
+                    self.content_id,
+                    self.title,
+                    self.contents
+                )
+            )
+        except Exception as e:
+            print(e)
+
+    async def deal(self):
+        """
+        deal
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            # 记录数据
+            await self.inputIntoArticleVideos()
+            await self.inputIntoArticleText()
+            res = {
+                "status": "success input to article queue",
+                "code": 0,
+                "traceId": self.trace_id
+            }
+            return res

+ 242 - 0
server/api/response.py

@@ -0,0 +1,242 @@
+"""
+@author: luojunhui
+"""
+import json
+import uuid
+import time
+import random
+import hashlib
+import urllib.parse
+
+from applications.functions.log import logging
+
+
+class Response(object):
+    """
+    Response
+    """
+
+    def __init__(self, params, mysql_client, config):
+        """
+        Response 接口
+        """
+        self.trace_id = None
+        self.mini_program_type = None
+        self.mysql_client = mysql_client
+        self.params = params
+        self.article_videos = config.articleVideos
+        self.mini_map = json.loads(config.getConfigValue("miniMap"))
+
+    def checkParams(self):
+        """
+        请求参数校验
+        :return:
+        """
+        try:
+            self.mini_program_type = self.params['miniprogramUseType']
+            self.trace_id = self.params['traceId']
+            return None
+        except Exception as e:
+            return {
+                "error": "params error",
+                "msg": str(e),
+                "info": self.params
+            }
+
+    async def getVideosResult(self):
+        """
+        获取结果
+        :return:
+        """
+        select_sql = f"""
+        SELECT gh_id, content_status, response, process_times
+        FROM {self.article_videos}
+        WHERE trace_id = '{self.trace_id}';
+        """
+        info_tuple = await self.mysql_client.asyncSelect(select_sql)
+        gh_id, content_status, response, process_times = info_tuple[0]
+        return {
+            "ghId": gh_id,
+            "contentStatus": content_status,
+            "response": json.loads(response),
+            "processTimes": process_times
+        }
+
+    def createGzhPath(self, video_id, shared_uid, gh_id):
+        """
+        :param gh_id: 公众号账号的gh_id
+        :param video_id: 视频 id
+        :param shared_uid: 分享 id
+        """
+
+        def generate_source_id():
+            """
+            generate_source_id
+            :return:
+            """
+            timestamp = str(int(time.time() * 1000))
+            random_str = str(random.randint(1000, 9999))
+            hash_input = f"{timestamp}-{random_str}"
+            return hashlib.md5(hash_input.encode()).hexdigest()
+
+        root_share_id = str(uuid.uuid4())
+        if self.mini_program_type == 2:
+            source_id = (
+                    "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
+            )
+        elif self.mini_program_type == 1:
+            source_id = "longArticles_" + generate_source_id()
+        elif self.mini_program_type == 3:
+            source_id = "WeCom_" + generate_source_id()
+        else:
+            source_id = "Error mini_program_type {}".format(self.mini_program_type)
+        url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
+        # 自动把 root_share_id 加入到白名单
+        # auto_white(root_share_id)
+        return (
+            root_share_id,
+            source_id,
+            f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
+        )
+
+    async def generateCard(self, index, gh_id, mini_id, item):
+        """
+        生成单个分享卡片
+        :param item: 单个视频结果
+        :param mini_id:  小程序 appType
+        :param gh_id: 公众号 id
+        :param index: 视频位置
+        :return:
+        """
+        str_mini_id = str(mini_id)
+        mini_info = self.mini_map[str_mini_id]
+        avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
+        root_share_id, root_source_id, production_path = self.createGzhPath(
+            video_id=item['videoId'],
+            shared_uid=item['uid'],
+            gh_id=gh_id
+        )
+        logging(
+            code="1002",
+            info="root_share_id --{}, productionPath -- {}".format(
+                root_share_id, production_path
+            ),
+            function="process",
+            trace_id=self.trace_id,
+        )
+        result = {
+            "productionCover": item['videoCover'],
+            "productionName": item['kimiTitle'],
+            "programAvatar": avatar,
+            "programId": app_id,
+            "programName": app_name,
+            "source": item['source'],
+            "rootShareId": root_share_id,
+            "productionPath": production_path,
+            "videoUrl": item['videoPath'],
+            "mini_id": mini_id,
+            "paragraphPosition": index * 0.25
+        }
+        if index == 1:
+            result['paragraphPosition'] = 0.01
+        item['rootSourceId'] = root_source_id
+        return result, item
+
+    async def generateCards(self, result):
+        """
+        生成返回卡片
+        :return:
+        """
+        gh_id = result['ghId']
+        response = result['response']
+        match self.mini_program_type:
+            case 1:
+                L = []
+                new_item_list = []
+                for index, item in enumerate(response, 1):
+                    random_num = random.randint(1, 10)
+                    if random_num in [1, 2, 3, 4, 5, 6]:
+                        mini_id = 25
+                    elif random_num in [7, 8]:
+                        mini_id = 29
+                    else:
+                        mini_id = 31
+                    card, new_item = await self.generateCard(index, gh_id, mini_id, item)
+                    L.append(card)
+                    new_item_list.append(new_item)
+                return L, new_item_list
+            case 2:
+                L = []
+                new_item_list = []
+                for index, item in enumerate(response, 1):
+                    card, new_item = await self.generateCard(index, gh_id, 33, item)
+                    L.append(card)
+                    new_item_list.append(new_item)
+                return L, new_item_list
+            case 3:
+                L = []
+                new_item_list = []
+                for index, item in enumerate(response, 1):
+                    card, new_item = await self.generateCard(index, gh_id, 27, item)
+                    L.append(card)
+                    new_item_list.append(card)
+                return L, new_item_list
+
+    async def job(self):
+        """
+        执行方法
+        :return:
+        """
+        response = await self.getVideosResult()
+        status_code = response.get('contentStatus')
+        process_times = response.get('processTimes')
+        match status_code:
+            case 0:
+                if process_times > 3:
+                    result = {
+                        "traceId": self.trace_id,
+                        "code": 0,
+                        "error": "匹配失败,处理超过 3 次"
+                    }
+                else:
+                    result = {
+                        "traceId": self.trace_id,
+                        "code": 0,
+                        "Message": "该请求还没处理"
+                    }
+                return result
+            case 1:
+                return {
+                    "traceId": self.trace_id,
+                    "code": 1,
+                    "Message": "该请求正在处理中"
+                }
+            case 2:
+                card_list, new_items = await self.generateCards(result=response)
+                update_sql = f"""
+                UPDATE {self.article_videos}
+                SET response = %s, success_status = %s
+                WHERE trace_id = %s;
+                """
+                await self.mysql_client.asyncInsert(
+                    sql=update_sql,
+                    params=(json.dumps(new_items, ensure_ascii=False), 1, self.trace_id)
+                )
+                return {"traceId": self.trace_id, "miniprogramList": card_list}
+            case 3:
+                return {
+                    "traceId": self.trace_id,
+                    "code": 3,
+                    "error": "匹配失败,超过三次"
+                }
+
+    async def deal(self):
+        """
+        api process starts from here
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            return await self.job()

+ 11 - 9
applications/routes.py → server/routes.py

@@ -3,12 +3,12 @@
 """
 from quart import Blueprint, jsonify, request
 
-from applications.deal import Response, Record, Minigram, GetOffVideos
+from server.api import Response, Record, Minigram, GetOffVideos
 
-my_blueprint = Blueprint('LongArticles', __name__)
+my_blueprint = Blueprint('LongArticlesMatchServer', __name__)
 
 
-def Routes(mysql_client):
+def Routes(mysql_client, config):
     """
     路由代码
     """
@@ -24,11 +24,11 @@ def Routes(mysql_client):
     @my_blueprint.route('/search_videos', methods=['POST'])
     async def search_videos_from_the_web():
         """
-        从web 搜索视频并且存储到票圈的视频库中
+        record Data
         :return:
         """
         params = await request.get_json()
-        SD = Record(params=params, mysql_client=mysql_client)
+        SD = Record(params=params, mysql_client=mysql_client, config=config)
         result = await SD.deal()
         return jsonify(result)
 
@@ -39,9 +39,11 @@ def Routes(mysql_client):
         :return:
         """
         data = await request.get_json()
-        trace_id = data['traceId']
-        minigram_type = data['miniprogramUseType']
-        RD = Response(trace_id=trace_id, mini_program_type=minigram_type, mysql_client=mysql_client)
+        RD = Response(
+            params=data,
+            mysql_client=mysql_client,
+            config=config
+        )
         response = await RD.deal()
         return jsonify(response)
 
@@ -63,7 +65,7 @@ def Routes(mysql_client):
         :return:
         """
         data = await request.get_json()
-        GOV = GetOffVideos(params=data, mysql_client=mysql_client)
+        GOV = GetOffVideos(params=data, mysql_client=mysql_client, config=config)
         response = await GOV.deal()
         return jsonify(response)
 

+ 29 - 0
spiderTask.py

@@ -0,0 +1,29 @@
+"""
+@author: luojunhui
+"""
+import time
+import datetime
+import asyncio
+
+from tasks.spider_task import spiderTask
+from applications.db import TaskMySQLClient
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = spiderTask(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待60s".format(now_str))
+        time.sleep(60)
+

+ 0 - 993
static/config.py

@@ -1,993 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-gh_id_dict = {
-    "gh_008ef23062ee": {
-        "uid": 71582074,
-        "nick_name": "北至"
-    },
-    "gh_01a417472fdd": {
-        "uid": 72459350,
-        "nick_name": "婪"
-    },
-    "gh_01f8afd03366": {
-        "uid": 69637520,
-        "nick_name": "非亲非故"
-    },
-    "gh_036ab9bb685a": {
-        "uid": 72088957,
-        "nick_name": "不知春秋"
-    },
-    "gh_0394dd7e37e2": {
-        "uid": 73723034,
-        "nick_name": "上瘾"
-    },
-    "gh_03d32e83122f": {
-        "uid": 71881045,
-        "nick_name": "安抚"
-    },
-    "gh_03d45c260115": {
-        "uid": 72385751,
-        "nick_name": "少女提到战情场"
-    },
-    "gh_058e41145a0c": {
-        "uid": 69637476,
-        "nick_name": "甜腻梦话"
-    },
-    "gh_080bb43aa0dc": {
-        "uid": 70731106,
-        "nick_name": "那一树的嫣红"
-    },
-    "gh_084a485e859a": {
-        "uid": 69637472,
-        "nick_name": "梦星月"
-    },
-    "gh_0921c03402cd": {
-        "uid": 69637531,
-        "nick_name": "你的女友"
-    },
-    "gh_0b29b081f237": {
-        "uid": 72088986,
-        "nick_name": "顾然"
-    },
-    "gh_0bedf49a6a7e": {
-        "uid": 72088983,
-        "nick_name": "少女酒馆"
-    },
-    "gh_0c89e11f8bf3": {
-        "uid": 69637508,
-        "nick_name": "粟米"
-    },
-    "gh_0d8a65926cdf": {
-        "uid": 71353811,
-        "nick_name": "寻歡猫"
-    },
-    "gh_0dea45f2342a": {
-        "uid": 72088958,
-        "nick_name": "抚琴浅唱入君心"
-    },
-    "gh_0e4fd9e88386": {
-        "uid": 71881046,
-        "nick_name": "白玖逸"
-    },
-    "gh_156c66ac3e37": {
-        "uid": 73858591,
-        "nick_name": "微醉阳光"
-    },
-    "gh_1686250f15b6": {
-        "uid": 72385753,
-        "nick_name": "遥西江月"
-    },
-    "gh_171cec079b2a": {
-        "uid": 69637501,
-        "nick_name": "海上"
-    },
-    "gh_183d80deffb8": {
-        "uid": 69637491,
-        "nick_name": "论趣"
-    },
-    "gh_18e87b85e90c": {
-        "uid": 72459372,
-        "nick_name": "想你成瘾"
-    },
-    "gh_192c9cf58b13": {
-        "uid": 70731102,
-        "nick_name": "久亿"
-    },
-    "gh_199f74839fb3": {
-        "uid": 72770994,
-        "nick_name": "殇"
-    },
-    "gh_19daa341c8f9": {
-        "uid": 71353816,
-        "nick_name": "美"
-    },
-    "gh_1b27dd1beeca": {
-        "uid": 71021080,
-        "nick_name": "聚散常迷"
-    },
-    "gh_1c7e15e6ac9e": {
-        "uid": 72088971,
-        "nick_name": "我在等你"
-    },
-    "gh_1d10403eb554": {
-        "uid": 71353815,
-        "nick_name": "听闻那年"
-    },
-    "gh_1d887d61088c": {
-        "uid": 71021081,
-        "nick_name": "明月不知沟渠心"
-    },
-    "gh_1e83780cc5a8": {
-        "uid": 71353810,
-        "nick_name": "眉眼"
-    },
-    "gh_1ee2e1b39ccf": {
-        "uid": 69637473,
-        "nick_name": "纵有疾风起"
-    },
-    "gh_2046389c46d3": {
-        "uid": 72459359,
-        "nick_name": "北街九命猫南巷七秒鱼"
-    },
-    "gh_2111a67800ba": {
-        "uid": 74162341,
-        "nick_name": "夜半呻吟"
-    },
-    "gh_234ef02cdee5": {
-        "uid": 69637513,
-        "nick_name": "夹逼"
-    },
-    "gh_2370907fc8d6": {
-        "uid": 71051365,
-        "nick_name": "吹长笛"
-    },
-    "gh_243829b5ff02": {
-        "uid": 74162336,
-        "nick_name": "难做"
-    },
-    "gh_24aa1e4bf177": {
-        "uid": 71353820,
-        "nick_name": "翁思"
-    },
-    "gh_25c925cf5d8a": {
-        "uid": 74162347,
-        "nick_name": "谦谦公子"
-    },
-    "gh_26a307578776": {
-        "uid": 69637490,
-        "nick_name": "最宝贝的宝贝"
-    },
-    "gh_29074b51f2b7": {
-        "uid": 69637530,
-        "nick_name": "沉舸"
-    },
-    "gh_291ec369f017": {
-        "uid": 74162334,
-        "nick_name": "执炬逆风"
-    },
-    "gh_2abee512312a": {
-        "uid": 70731103,
-        "nick_name": "舍我"
-    },
-    "gh_2b8c6aa035ae": {
-        "uid": 69637470,
-        "nick_name": "懶得取名"
-    },
-    "gh_2e0c4609839f": {
-        "uid": 74162332,
-        "nick_name": "神君"
-    },
-    "gh_2e615fa75ffb": {
-        "uid": 72683739,
-        "nick_name": "空巢老人家"
-    },
-    "gh_2fde0fd0ad79": {
-        "uid": 74162335,
-        "nick_name": "城之"
-    },
-    "gh_30816d8adb52": {
-        "uid": 71616016,
-        "nick_name": "鹤绝"
-    },
-    "gh_3365a6f8b17f": {
-        "uid": 72459349,
-        "nick_name": "最好不过明天见"
-    },
-    "gh_34318194fd0e": {
-        "uid": 69637517,
-        "nick_name": "徒四壁"
-    },
-    "gh_37b084e62f1c": {
-        "uid": 72459374,
-        "nick_name": "讨人厌"
-    },
-    "gh_3845af6945d0": {
-        "uid": 69637545,
-        "nick_name": "秋水娉婷"
-    },
-    "gh_39218d3a3ec1": {
-        "uid": 74162342,
-        "nick_name": "人生如梦"
-    },
-    "gh_3ac6d7208961": {
-        "uid": 69637497,
-        "nick_name": "小熊的少女梦"
-    },
-    "gh_3c7d38636846": {
-        "uid": 69637519,
-        "nick_name": "油腻腻"
-    },
-    "gh_3ce2fa1956ea": {
-        "uid": 74162320,
-        "nick_name": "疑心病"
-    },
-    "gh_3df10391639c": {
-        "uid": 69637541,
-        "nick_name": "六郎娇面"
-    },
-    "gh_3e91f0624545": {
-        "uid": 71616018,
-        "nick_name": "青衫故人"
-    },
-    "gh_3ed305b5817f": {
-        "uid": 71021082,
-        "nick_name": "醉色染红颜"
-    },
-    "gh_3f4e2c890272": {
-        "uid": 73858595,
-        "nick_name": "海棠"
-    },
-    "gh_40a0ad154478": {
-        "uid": 69637516,
-        "nick_name": "禁止"
-    },
-    "gh_40a2ead40083": {
-        "uid": 72459348,
-        "nick_name": "两袖"
-    },
-    "gh_40fa65a44aa1": {
-        "uid": 72558479,
-        "nick_name": "南七夏"
-    },
-    "gh_424c8eeabced": {
-        "uid": 69637522,
-        "nick_name": "认命"
-    },
-    "gh_44127c197525": {
-        "uid": 74162321,
-        "nick_name": "青昧"
-    },
-    "gh_442a2c336dd8": {
-        "uid": 72459352,
-        "nick_name": "彻夜"
-    },
-    "gh_4568b5a7e2fe": {
-        "uid": 69637482,
-        "nick_name": "香腮"
-    },
-    "gh_45beb952dc74": {
-        "uid": 69637488,
-        "nick_name": "毋庸"
-    },
-    "gh_46879953339f": {
-        "uid": 73723019,
-        "nick_name": "天生童真"
-    },
-    "gh_484de412b0ef": {
-        "uid": 69637481,
-        "nick_name": "婪"
-    },
-    "gh_4a05bb40ab9b": {
-        "uid": 72459370,
-        "nick_name": "忧欢"
-    },
-    "gh_4af3ce7f9a85": {
-        "uid": 72459373,
-        "nick_name": "相衬"
-    },
-    "gh_4c058673c07e": {
-        "uid": 69637474,
-        "nick_name": "影帝"
-    },
-    "gh_4c21daadd79f": {
-        "uid": 74162330,
-        "nick_name": "想被宠一次"
-    },
-    "gh_4f6bfd731ac8": {
-        "uid": 74162328,
-        "nick_name": "浅梦"
-    },
-    "gh_50a7969695f6": {
-        "uid": 73723024,
-        "nick_name": "日日思君"
-    },
-    "gh_50c78060c5f2": {
-        "uid": 72088980,
-        "nick_name": "在乎"
-    },
-    "gh_5198e38df4c0": {
-        "uid": 74162331,
-        "nick_name": "饮长风"
-    },
-    "gh_51e4ad40466d": {
-        "uid": 71616015,
-        "nick_name": "吃甜"
-    },
-    "gh_531881b6d1ad": {
-        "uid": 74162346,
-        "nick_name": "如同昨日"
-    },
-    "gh_538f78f9d3aa": {
-        "uid": 69637478,
-        "nick_name": "伤痕"
-    },
-    "gh_53cb7afbb2df": {
-        "uid": 72459360,
-        "nick_name": "卷子泛滥成灾"
-    },
-    "gh_55866e76fc28": {
-        "uid": 74162343,
-        "nick_name": "少走感情路"
-    },
-    "gh_56a6765df869": {
-        "uid": 69637514,
-        "nick_name": "风月"
-    },
-    "gh_56ca3dae948c": {
-        "uid": 69637538,
-        "nick_name": "留下太多回忆"
-    },
-    "gh_57573f01b2ee": {
-        "uid": 71582075,
-        "nick_name": "醉色染红颜"
-    },
-    "gh_5765f834684c": {
-        "uid": 74162324,
-        "nick_name": "一笑百媚生"
-    },
-    "gh_57c9e8babea7": {
-        "uid": 72683735,
-        "nick_name": "离去"
-    },
-    "gh_5887a9154605": {
-        "uid": 71353814,
-        "nick_name": "寄许"
-    },
-    "gh_58d75d978f31": {
-        "uid": 72047581,
-        "nick_name": "经历风雨"
-    },
-    "gh_58fa9df7733a": {
-        "uid": 73858593,
-        "nick_name": "一身宠爱"
-    },
-    "gh_59b9f8ef99a4": {
-        "uid": 71051360,
-        "nick_name": "小編最可愛"
-    },
-    "gh_5ae65db96cb7": {
-        "uid": 70731108,
-        "nick_name": "喘声娇息"
-    },
-    "gh_5c841d031d42": {
-        "uid": 72459356,
-        "nick_name": "香肩"
-    },
-    "gh_5d68fee2093f": {
-        "uid": 72088974,
-        "nick_name": "足够体面"
-    },
-    "gh_5e1464b76ff6": {
-        "uid": 74162323,
-        "nick_name": "不走为尽"
-    },
-    "gh_5e543853d8f0": {
-        "uid": 69637543,
-        "nick_name": "不知春秋"
-    },
-    "gh_5ff48e9fb9ef": {
-        "uid": 69637494,
-        "nick_name": "寻她找他"
-    },
-    "gh_6503fd75c35e": {
-        "uid": 72459358,
-        "nick_name": "一口甜"
-    },
-    "gh_660afe87b6fd": {
-        "uid": 72385752,
-        "nick_name": "青涩迷人"
-    },
-    "gh_671f460c856c": {
-        "uid": 69637523,
-        "nick_name": "绝不改悔"
-    },
-    "gh_67776b73f896": {
-        "uid": 73723032,
-        "nick_name": "风过长街"
-    },
-    "gh_68e7fdc09fe4": {
-        "uid": 71371653,
-        "nick_name": "红尘客"
-    },
-    "gh_69f4300b4cda": {
-        "uid": 72459366,
-        "nick_name": "仙气儿"
-    },
-    "gh_6b7c2a257263": {
-        "uid": 69637528,
-        "nick_name": "奶牙"
-    },
-    "gh_6cfd1132df94": {
-        "uid": 70731104,
-        "nick_name": "中指灵活"
-    },
-    "gh_6d205db62f04": {
-        "uid": 69637509,
-        "nick_name": "怕羞"
-    },
-    "gh_6d3aa9d13402": {
-        "uid": 72683738,
-        "nick_name": "四月的荒原"
-    },
-    "gh_6d9f36e3a7be": {
-        "uid": 69637498,
-        "nick_name": "望长安"
-    },
-    "gh_6da12d2660ba": {
-        "uid": 72088985,
-        "nick_name": "你妈biu"
-    },
-    "gh_6e75b9dcde6b": {
-        "uid": 72770995,
-        "nick_name": "宠儿"
-    },
-    "gh_6f5221bf1801": {
-        "uid": 72459351,
-        "nick_name": "顾词"
-    },
-    "gh_6ff82851890a": {
-        "uid": 72088976,
-        "nick_name": "世俗"
-    },
-    "gh_7208b813f16d": {
-        "uid": 74162329,
-        "nick_name": "数流年"
-    },
-    "gh_72bace6b3059": {
-        "uid": 70731109,
-        "nick_name": "万物生息"
-    },
-    "gh_73be0287bb94": {
-        "uid": 69637537,
-        "nick_name": "戏剧"
-    },
-    "gh_744cb16f6e16": {
-        "uid": 69637505,
-        "nick_name": "反駁"
-    },
-    "gh_749271f1ccd5": {
-        "uid": 72125026,
-        "nick_name": "天下第一"
-    },
-    "gh_759ace9d4567": {
-        "uid": 71353812,
-        "nick_name": "青丝与白猫"
-    },
-    "gh_77f36c109fb1": {
-        "uid": 71569299,
-        "nick_name": "轻盈"
-    },
-    "gh_78640efc49bf": {
-        "uid": 73723020,
-        "nick_name": "打更人"
-    },
-    "gh_789a40fe7935": {
-        "uid": 71616017,
-        "nick_name": "杀手也动情"
-    },
-    "gh_7adb9417f845": {
-        "uid": 72088960,
-        "nick_name": "南音雨阁"
-    },
-    "gh_7b4a5f86d68c": {
-        "uid": 69637477,
-        "nick_name": "我很想你"
-    },
-    "gh_7bca1c99aea0": {
-        "uid": 69637511,
-        "nick_name": "从小就很傲"
-    },
-    "gh_7c66e0dbd2cf": {
-        "uid": 72125025,
-        "nick_name": "烟波明灭"
-    },
-    "gh_7e5818b2dd83": {
-        "uid": 69637532,
-        "nick_name": "二八佳人"
-    },
-    "gh_7f5075624a50": {
-        "uid": 70731113,
-        "nick_name": "浮現"
-    },
-    "gh_845f84745a80": {
-        "uid": 74162327,
-        "nick_name": "可鄙"
-    },
-    "gh_87df74d068d5": {
-        "uid": 72459347,
-        "nick_name": "教养"
-    },
-    "gh_89d00dca4896": {
-        "uid": 72088978,
-        "nick_name": "知己"
-    },
-    "gh_89ef4798d3ea": {
-        "uid": 69637533,
-        "nick_name": "彼岸花"
-    },
-    "gh_8a783ca03d5e": {
-        "uid": 72459353,
-        "nick_name": "菁华浮梦"
-    },
-    "gh_8eaa863bc40e": {
-        "uid": 71353813,
-        "nick_name": "花费时间"
-    },
-    "gh_8f9bd3d100d3": {
-        "uid": 72459357,
-        "nick_name": "浮念"
-    },
-    "gh_901b0d722749": {
-        "uid": 69637518,
-        "nick_name": "深情不为我"
-    },
-    "gh_9161517e5676": {
-        "uid": 69637495,
-        "nick_name": "折磨"
-    },
-    "gh_91abdbc32d5f": {
-        "uid": 71353822,
-        "nick_name": "一往情深"
-    },
-    "gh_92da3c574f82": {
-        "uid": 73723018,
-        "nick_name": "蝶无需花恋"
-    },
-    "gh_93e00e187787": {
-        "uid": 69637504,
-        "nick_name": "理会"
-    },
-    "gh_947785cd2d97": {
-        "uid": 73723022,
-        "nick_name": "含笑"
-    },
-    "gh_949bf0195759": {
-        "uid": 72088981,
-        "nick_name": "解脱"
-    },
-    "gh_95ed5ecf9363": {
-        "uid": 71881047,
-        "nick_name": "路途"
-    },
-    "gh_969f5ea5fee1": {
-        "uid": 71582077,
-        "nick_name": "仁至义尽"
-    },
-    "gh_97034d655595": {
-        "uid": 72459363,
-        "nick_name": "萌懂"
-    },
-    "gh_970460d9ccec": {
-        "uid": 71881048,
-        "nick_name": "青丝"
-    },
-    "gh_9743cdc25c97": {
-        "uid": 72459368,
-        "nick_name": "一刀两断"
-    },
-    "gh_9782c8a85bce": {
-        "uid": 72047582,
-        "nick_name": "尢物少女"
-    },
-    "gh_9877c8541764": {
-        "uid": 69637506,
-        "nick_name": "我沿着悲伤"
-    },
-    "gh_98a84818284c": {
-        "uid": 72088972,
-        "nick_name": "就是昏君"
-    },
-    "gh_98ec0ffe69b3": {
-        "uid": 72683736,
-        "nick_name": "苦衷"
-    },
-    "gh_9904c57b243e": {
-        "uid": 72088965,
-        "nick_name": "意中人"
-    },
-    "gh_9a1e71e3460d": {
-        "uid": 72459355,
-        "nick_name": "多少画"
-    },
-    "gh_9cf3b7ff486b": {
-        "uid": 69637492,
-        "nick_name": "hoit"
-    },
-    "gh_9e559b3b94ca": {
-        "uid": 69637471,
-        "nick_name": "我与你相遇"
-    },
-    "gh_9ee24345c6ce": {
-        "uid": 71353818,
-        "nick_name": "爱隔山河"
-    },
-    "gh_9eef14ad6c16": {
-        "uid": 70731110,
-        "nick_name": "抱一抱"
-    },
-    "gh_9f8dc5b0c74e": {
-        "uid": 69637496,
-        "nick_name": "港口"
-    },
-    "gh_a172a6bf7dc5": {
-        "uid": 72088963,
-        "nick_name": "Thorns"
-    },
-    "gh_a182cfc94dad": {
-        "uid": 69637539,
-        "nick_name": "四海八荒"
-    },
-    "gh_a1b5a2142e11": {
-        "uid": 72088970,
-        "nick_name": "唯壹的執著"
-    },
-    "gh_a221d1a952aa": {
-        "uid": 73858592,
-        "nick_name": "凉薄少年葬空城"
-    },
-    "gh_a2901d34f75b": {
-        "uid": 69637535,
-        "nick_name": "听腻了谎话"
-    },
-    "gh_a29af5b297b1": {
-        "uid": 74162338,
-        "nick_name": "凑巧"
-    },
-    "gh_a307072c04b9": {
-        "uid": 69637521,
-        "nick_name": "踏步"
-    },
-    "gh_a57a534ec05c": {
-        "uid": 72459364,
-        "nick_name": "想和你同床"
-    },
-    "gh_a5c534db075f": {
-        "uid": 74162333,
-        "nick_name": "养猪仙人"
-    },
-    "gh_a5e71438865e": {
-        "uid": 72088977,
-        "nick_name": "同类"
-    },
-    "gh_a6351b447819": {
-        "uid": 69637540,
-        "nick_name": "七猫酒馆"
-    },
-    "gh_abc6794e6996": {
-        "uid": 73723021,
-        "nick_name": "娇娘"
-    },
-    "gh_abd3aef4debe": {
-        "uid": 72088956,
-        "nick_name": "弱鸡"
-    },
-    "gh_ac43e43b253b": {
-        "uid": 69637499,
-        "nick_name": "一厢情愿"
-    },
-    "gh_ac43eb24376d": {
-        "uid": 71582078,
-        "nick_name": "亏心者"
-    },
-    "gh_ad7b26ee9e17": {
-        "uid": 71353809,
-        "nick_name": "最宝贝的宝贝"
-    },
-    "gh_adca24a8f429": {
-        "uid": 69637483,
-        "nick_name": "对你何止一句喜欢"
-    },
-    "gh_aed298be263a": {
-        "uid": 73723029,
-        "nick_name": "毒瘤"
-    },
-    "gh_af559c13a06e": {
-        "uid": 73723031,
-        "nick_name": "气质"
-    },
-    "gh_b15de7c99912": {
-        "uid": 69637536,
-        "nick_name": "糖炒板栗"
-    },
-    "gh_b181786a6c8c": {
-        "uid": 72385754,
-        "nick_name": "暴露人性"
-    },
-    "gh_b24476d5090c": {
-        "uid": 72088967,
-        "nick_name": "挽救"
-    },
-    "gh_b32125c73861": {
-        "uid": 69637493,
-        "nick_name": "发尾"
-    },
-    "gh_b3f26b9dccbd": {
-        "uid": 72125024,
-        "nick_name": "苍天"
-    },
-    "gh_b3ffc1ca3a04": {
-        "uid": 69637546,
-        "nick_name": "主宰你心"
-    },
-    "gh_b676b7ad9b74": {
-        "uid": 71021086,
-        "nick_name": "枕畔红冰薄"
-    },
-    "gh_b6f2c5332c72": {
-        "uid": 71021088,
-        "nick_name": "斗篷"
-    },
-    "gh_b721056f294a": {
-        "uid": 72459371,
-        "nick_name": "些许几多"
-    },
-    "gh_b7f7144a96c5": {
-        "uid": 72088962,
-        "nick_name": "纵情荒唐"
-    },
-    "gh_b8baac4296cb": {
-        "uid": 69637489,
-        "nick_name": "生性"
-    },
-    "gh_b9b99173ff8a": {
-        "uid": 69637524,
-        "nick_name": "养一只月亮"
-    },
-    "gh_ba6e1e4f2de0": {
-        "uid": 71353817,
-        "nick_name": "余欢"
-    },
-    "gh_bd57b6978e06": {
-        "uid": 69637527,
-        "nick_name": "厌遇"
-    },
-    "gh_be505c7d28ac": {
-        "uid": 71353823,
-        "nick_name": "遥远的她"
-    },
-    "gh_be8c29139989": {
-        "uid": 69637502,
-        "nick_name": "不负"
-    },
-    "gh_bfe5b705324a": {
-        "uid": 69637529,
-        "nick_name": "乐极"
-    },
-    "gh_bfea052b5baa": {
-        "uid": 72683733,
-        "nick_name": "手背"
-    },
-    "gh_bff0bcb0694a": {
-        "uid": 69637534,
-        "nick_name": "简迷离"
-    },
-    "gh_c484cea5ce0b": {
-        "uid": 72088968,
-        "nick_name": "猫咚"
-    },
-    "gh_c5cdf60d9ab4": {
-        "uid": 70731111,
-        "nick_name": "定酌"
-    },
-    "gh_c69776baf2cd": {
-        "uid": 69637512,
-        "nick_name": "骄纵"
-    },
-    "gh_c794770120dc": {
-        "uid": 71353819,
-        "nick_name": "以笑相迎"
-    },
-    "gh_c7c9929bdfc4": {
-        "uid": 72088969,
-        "nick_name": "草莓酸奶"
-    },
-    "gh_c91b42649690": {
-        "uid": 69637503,
-        "nick_name": "荟萃"
-    },
-    "gh_cd82bb10dd98": {
-        "uid": 72459354,
-        "nick_name": "著迷動心"
-    },
-    "gh_d2cc901deca7": {
-        "uid": 69637487,
-        "nick_name": "恶意调笑"
-    },
-    "gh_d367454ca4d7": {
-        "uid": 72088961,
-        "nick_name": "不负虚设"
-    },
-    "gh_d49df5e974ca": {
-        "uid": 70731107,
-        "nick_name": "没你"
-    },
-    "gh_d4dffc34ac39": {
-        "uid": 70744072,
-        "nick_name": "驯课"
-    },
-    "gh_d5f935d0d1f2": {
-        "uid": 69637500,
-        "nick_name": "青少年哪吒"
-    },
-    "gh_d8c215687f02": {
-        "uid": 73723033,
-        "nick_name": "看淡"
-    },
-    "gh_d9de0e9b70e9": {
-        "uid": 72088973,
-        "nick_name": "枝头月"
-    },
-    "gh_da2732f4518b": {
-        "uid": 74162340,
-        "nick_name": "小城管小商贩"
-    },
-    "gh_da44c409ec0f": {
-        "uid": 73723025,
-        "nick_name": "红眼"
-    },
-    "gh_da76772d8d15": {
-        "uid": 69637526,
-        "nick_name": "独揽风月"
-    },
-    "gh_dd4c857bbb36": {
-        "uid": 71021083,
-        "nick_name": "过分着迷"
-    },
-    "gh_ddafea4bcc29": {
-        "uid": 72410372,
-        "nick_name": "枯燥"
-    },
-    "gh_ddf6ec0104d0": {
-        "uid": 73858596,
-        "nick_name": "诗情"
-    },
-    "gh_de2c9fefe715": {
-        "uid": 72088982,
-        "nick_name": "旧心"
-    },
-    "gh_de9f9ebc976b": {
-        "uid": 69637475,
-        "nick_name": "剑出鞘恩怨了"
-    },
-    "gh_e0eb490115f5": {
-        "uid": 69637486,
-        "nick_name": "赋别"
-    },
-    "gh_e24da99dc899": {
-        "uid": 69637484,
-        "nick_name": "恋雨夏季"
-    },
-    "gh_e2576b7181c6": {
-        "uid": 69637515,
-        "nick_name": "满天星"
-    },
-    "gh_e3a8e14013cd": {
-        "uid": 73723030,
-        "nick_name": "城堡"
-    },
-    "gh_e4506a7ce46f": {
-        "uid": 74162322,
-        "nick_name": "肥球"
-    },
-    "gh_e56ddf195d91": {
-        "uid": 74162345,
-        "nick_name": "煽情"
-    },
-    "gh_e6be5a12e83c": {
-        "uid": 74162348,
-        "nick_name": "争霸"
-    },
-    "gh_e75dbdc73d80": {
-        "uid": 69637542,
-        "nick_name": "情战"
-    },
-    "gh_e9d819f9e147": {
-        "uid": 69637525,
-        "nick_name": "与卿"
-    },
-    "gh_eb1f9880beb7": {
-        "uid": 73723026,
-        "nick_name": "清辰"
-    },
-    "gh_ebbba8a46bbc": {
-        "uid": 74162319,
-        "nick_name": "朝筠"
-    },
-    "gh_ede868a2e656": {
-        "uid": 73858594,
-        "nick_name": "晚安"
-    },
-    "gh_ee78360d06f5": {
-        "uid": 71021089,
-        "nick_name": "思绪"
-    },
-    "gh_efaf7da157f5": {
-        "uid": 69637547,
-        "nick_name": "心野性子浪"
-    },
-    "gh_efd90dcf48ac": {
-        "uid": 74162316,
-        "nick_name": "动心"
-    },
-    "gh_f1122b34f1f3": {
-        "uid": 74162337,
-        "nick_name": "该怎样"
-    },
-    "gh_f25b5fb01977": {
-        "uid": 70731105,
-        "nick_name": "山色空濛"
-    },
-    "gh_f2bc589c78eb": {
-        "uid": 73723028,
-        "nick_name": "與妳穿過風"
-    },
-    "gh_f321695e3983": {
-        "uid": 72459375,
-        "nick_name": "阁"
-    },
-    "gh_f4594783f5b8": {
-        "uid": 69637544,
-        "nick_name": "自缚"
-    },
-    "gh_f74ca3104604": {
-        "uid": 74162325,
-        "nick_name": "鬼面書生"
-    },
-    "gh_f902cea89e48": {
-        "uid": 71021085,
-        "nick_name": "晚春的树"
-    },
-    "gh_fa0b85528a93": {
-        "uid": 72459365,
-        "nick_name": "永不言弃"
-    },
-    "gh_fb8031aca805": {
-        "uid": 72088959,
-        "nick_name": "惊醒"
-    },
-    "gh_fe6ef3a65a48": {
-        "uid": 69637480,
-        "nick_name": "风间"
-    },
-    "gh_ff487cb5dab3": {
-        "uid": 70744071,
-        "nick_name": "决然"
-    }
-}
-
-
-# prod
-db_article = "long_articles_video"
-db_video = "article_match_videos"
-
-# dev
-# db_article = "long_articles_video_dev"
-# db_video = "article_match_videos_dev"
-
-# spider coroutines
-spider_coroutines = 10
-
-# mysql coroutines
-mysql_coroutines = 100

BIN
static/logo.png


+ 0 - 3
static/official_accounts

@@ -1,3 +0,0 @@
-天天美好祝福生活
-wx0b7d95eb293b783b
-https://rescdn.yishihui.com/0temp/ttmhzfsh.png

+ 5 - 4
tasks/__init__.py

@@ -2,7 +2,8 @@
 @author: luojunhui
 定时任务
 """
-from .task1 import MatchTask1
-from .task2 import MatchTask2
-from .task3 import MatchTask3
-from .task4 import MatchTask4
+from .etl_task import AsyncETL
+from .kimi_task import KimiTask
+from .spider_task import spiderTask
+from .publish_task import publishTask
+from .history_task import historyContentIdTask

+ 0 - 150
tasks/chadui.py

@@ -1,150 +0,0 @@
-"""
-@author: luojunhui
-"""
-import asyncio
-
-from static.config import db_article, db_video
-from applications.functions.log import logging
-from static.config import mysql_coroutines
-
-
-class MatchTask5(object):
-    """
-    定时执行任务
-    """
-
-    def __init__(self, mysql_client):
-        """
-        :param mysql_client:
-        """
-        self.mysql_client = mysql_client
-
-    async def get_task(self):
-        """
-        获取任务
-        :return:
-        """
-        select_sql = f"""
-            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
-            FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
-            ORDER BY request_time_stamp
-            DESC
-            LIMIT {mysql_coroutines};
-        """
-        task_list = await self.mysql_client.async_select(sql=select_sql)
-        task_obj_list = [
-            {
-                "trace_id": item[0],
-                "content_id": item[1],
-                "gh_id": item[2],
-                "title": item[3],
-                "text": item[4],
-                "content_status": item[5],
-                "process_times": item[6]
-            } for item in task_list
-        ]
-        print("本次任务获取到 {} 条视频".format(len(task_obj_list)))
-        # logging(
-        #     code="9001",
-        #     info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
-        #     data=task_obj_list
-        # )
-        return task_obj_list
-
-    async def get_history_videos(self, content_id):
-        """
-        check whether the contents videos exists
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-            SELECT video_id
-            FROM {db_video}
-            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
-        """
-        content_videos = await self.mysql_client.async_select(select_sql)
-        videos = [vid for vid in content_videos]
-        print(len(videos))
-        if len(videos) >= 3:
-            return videos
-        else:
-            return None
-
-    async def use_exists_contents_videos(self, video_id_list, params):
-        """
-        使用已经存在的视频id
-        :return:
-        """
-        trace_id = params['trace_id']
-        content_id = params['content_id']
-        select_sql = f"""
-            SELECT kimi_title
-            FROM {db_article}
-            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
-        """
-        info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title = info[0]
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                kimi_title=%s,
-                recall_video_id1=%s, 
-                recall_video_id2=%s, 
-                recall_video_id3=%s,
-                content_status=%s,
-                process_times = %s
-            WHERE  trace_id = %s
-        """
-        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                kimi_title,
-                video_id_list[0],
-                "NULL" if vid2 is None else vid2,
-                "NULL" if vid3 is None else vid3,
-                2,
-                int(params['process_times']) + 1,
-                trace_id
-            )
-        )
-        logging(
-            code="9002",
-            info="已从历史文章更新,文章id: {}".format(content_id),
-            trace_id=trace_id
-        )
-
-    async def process_task(self, params):
-        """
-        异步执行
-        :param params:
-        :return:
-        """
-        content_id = params['content_id']
-        print(content_id)
-        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
-        video_id_list = await self.get_history_videos(content_id=content_id)
-        print(video_id_list)
-        if video_id_list:
-            # 说明已经存在了结果, 将该条记录下的video_id拿出来
-            print("存在历史文章")
-            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
-        else:
-            pass
-
-    async def deal(self):
-        """
-        处理
-        :return:
-        """
-        task_list = await self.get_task()
-        print(len(task_list))
-        if task_list:
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)
-        else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )

+ 300 - 0
tasks/etl_task.py

@@ -0,0 +1,300 @@
+"""
+@author: luojunhui
+"""
+import os
+
+import oss2
+import aiohttp
+import aiofiles
+import asyncio
+from hashlib import md5
+from uuid import uuid4
+
+import requests
+from fake_useragent import FakeUserAgent
+from applications.config import Config
+
+
+async def downloadCover(file_path, platform, cover_url):
+    """
+    下载视频封面
+    :param platform:
+    :param cover_url:
+    :param file_path:
+    :return:
+    """
+    headers = requestHeader(platform=platform, url=cover_url, download_type="cover")
+    response = requests.get(url=cover_url, headers=headers)
+    if b"<html>" in response.content:
+        return None
+    elif response.status_code != 200:
+        return None
+    else:
+        with open(file_path, "wb") as f:
+            f.write(response.content)
+        return file_path
+
+
+def requestHeader(platform, url, download_type="video"):
+    """
+    请求头
+    :return:
+    """
+    if platform == "xg_search":
+        if "v9-xg-web-pc.ixigua.com" in url:
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "Host": "v9-xg-web-pc.ixigua.com",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://www.ixigua.com/",
+                "Referer": "https://www.ixigua.com/"
+            }
+        elif "v3-xg-web-pc.ixigua.com" in url:
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "Host": "v3-xg-web-pc.ixigua.com",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://www.ixigua.com/",
+                "Referer": "https://www.ixigua.com/"
+            }
+        elif download_type == "cover":
+            headers = {
+                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
+                'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+                'Cache-Control': 'max-age=0',
+                'Proxy-Connection': 'keep-alive',
+                'Upgrade-Insecure-Requests': '1',
+                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
+            }
+        else:
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "Host": "v3-xg-web-pc.ixigua.com",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://www.ixigua.com/",
+                "Referer": "https://www.ixigua.com/"
+            }
+    elif platform == "baidu_search":
+        headers = {
+            "Accept": "*/*",
+            "Accept-Language": "zh-CN,zh;q=0.9",
+            "User-Agent": FakeUserAgent().chrome,
+        }
+    elif platform == "wx_search":
+        headers = {
+            "Accept": "*/*",
+            "Accept-Language": "zh-CN,zh;q=0.9",
+            "User-Agent": FakeUserAgent().chrome,
+            "Origin": "https://mp.weixin.qq.com",
+            "Referer": "https://mp.weixin.qq.com"
+        }
+    elif platform == "dy_search":
+        headers = {
+            'accept': '*/*',
+            'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
+            'priority': 'i',
+            'range': 'bytes=0-',
+            'referer': 'https://v11-coldf.douyinvod.com/',
+            'user-agent': FakeUserAgent().chrome
+        }
+    else:
+        headers = {}
+    return headers
+
+
+async def downloadVideo(file_path, platform, video_url, download_type="video"):
+    """
+    :param download_type:
+    :param video_url:
+    :param platform:
+    :param file_path:
+    :return:
+    """
+    headers = requestHeader(platform=platform, url=video_url, download_type=download_type)
+    if os.path.exists(file_path):
+        file_size = os.path.getsize(file_path)
+        headers["Range"] = f"bytes={file_size}-"
+    else:
+        file_size = 0
+    async with aiohttp.ClientSession() as session:
+        async with session.get(video_url, headers=headers) as response:
+            if response.status in [200, 206]:
+                if file_size > 0:
+                    async with aiofiles.open(file_path, "ab+") as f:
+                        # 以1MB为单位分块下载
+                        async for chunk in response.content.iter_chunked(1024 * 1024):
+                            await f.write(chunk)
+                else:
+                    async with aiofiles.open(file_path, "wb") as f:
+                        # 以1MB为单位分块下载
+                        async for chunk in response.content.iter_chunked(1024 * 1024):
+                            await f.write(chunk)
+
+            else:
+                print(response.status)
+    return file_path
+
+
+def generateVideoPath(platform, video_id):
+    """
+    通过视频信息生成唯一视频地址
+    :return:
+    """
+    index = "{}-{}-{}".format(platform, video_id, uuid4())
+    index = md5(index.encode()).hexdigest()
+    file_name = "{}.mp4".format(index)
+    cover_name = "{}.png".format(index)
+    file_path = os.path.join(os.getcwd(), "static", file_name)
+    cover_path = os.path.join(os.getcwd(), "static", cover_name)
+    return file_path, cover_path
+
+
+async def uploadToOss(local_video_path, download_type):
+    """
+    把视频上传到 oss
+    :return:
+    """
+    oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
+    access_key_id = "LTAIP6x1l3DXfSxm"
+    access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    endpoint = "oss-cn-hangzhou.aliyuncs.com"
+    bucket_name = "art-pubbucket"
+    bucket = oss2.Bucket(
+        oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
+    )
+    bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
+    return oss_video_key
+
+
+class AsyncETL(object):
+    """
+    视频下载功能
+    """
+
+    def __init__(self, mysql_client):
+        # self.proxy = {
+        #     "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
+        #     "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
+        # }
+        self.max_retry = 5
+        self.mysql_client = mysql_client
+        self.article_crawler_videos = Config().articleCrawlerVideos
+
+    async def getTasks(self):
+        """
+        获取视频 id
+        :return:
+        """
+        select_sql = f"""
+        SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id
+        FROM {self.article_crawler_videos}
+        WHERE download_status = 0
+        ORDER BY id
+        LIMIT 10;
+        """
+        result = await self.mysql_client.asyncSelect(select_sql)
+        if result:
+            tasks = [
+                {
+                    "id": line[0],
+                    "video_id": line[1],
+                    "platform": line[2],
+                    "video_title": line[3],
+                    "video_url": line[4],
+                    "cover_url": line[5],
+                    "user_id": line[6]
+                }
+                for line in result
+            ]
+            return tasks
+        else:
+            return []
+
+    async def processTask(self, params):
+        """
+        处理 task
+        :return:
+        {
+                    "id": line[0],
+                    "video_id": line[1],
+                    "platform": line[2],
+                    "video_title": line[3],
+                    "video_url": line[4],
+                    "cover_url": line[5],
+                    "user_id": line[6]
+                }
+        """
+        update_sql_0 = f"""
+                    UPDATE {self.article_crawler_videos}
+                    SET download_status = %s
+                    WHERE id = %s;
+                    """
+        await self.mysql_client.asyncInsert(
+            sql=update_sql_0,
+            params=(1, params['id'])
+        )
+        try:
+            local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id'])
+            # download videos
+            file_path = await downloadVideo(
+                file_path=local_video_path,
+                platform=params['platform'],
+                video_url=params['video_url']
+            )
+            # download cover
+            cover_path = await downloadCover(
+                file_path=local_cover_path,
+                platform=params['platform'],
+                cover_url=params['cover_url']
+            )
+            oss_video = await uploadToOss(
+                local_video_path=file_path,
+                download_type="video"
+            )
+            if cover_path:
+                oss_cover = await uploadToOss(
+                    local_video_path=cover_path,
+                    download_type="image"
+                )
+            else:
+                oss_cover = None
+            update_sql = f"""
+            UPDATE {self.article_crawler_videos}
+            SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+            WHERE id = %s;
+            """
+            await self.mysql_client.asyncInsert(
+                sql=update_sql,
+                params=(
+                    oss_video,
+                    oss_cover,
+                    2,
+                    params['id']
+                )
+            )
+        except Exception as e:
+            print("failed", e)
+            update_sql = f"""
+            UPDATE {self.article_crawler_videos}
+            SET download_status = %s
+            WHERE id = %s;
+            """
+            await self.mysql_client.asyncInsert(
+                sql=update_sql,
+                params=(3, params['id'])
+            )
+
+    async def deal(self):
+        """
+        ETL Deal Task
+        :return:
+        """
+        task_list = await self.getTasks()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            print("No spider tasks")

+ 215 - 0
tasks/history_task.py

@@ -0,0 +1,215 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import asyncio
+
+from applications.config import Config
+from applications.functions.log import logging
+from applications.functions.pqFunctions import publishToPQ
+
+
+class historyContentIdTask(object):
+    """
+    处理已经匹配过小程序的文章
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+        self.article_text = Config().articleText
+        self.article_video = Config().articleVideos
+        self.article_crawler_video = Config().articleCrawlerVideos
+        self.history_coroutines = Config().getConfigValue("historyArticleCoroutines")
+
+    async def getTaskList(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT 
+                ART.trace_id, 
+                ART.content_id, 
+                ART.flow_pool_level, 
+                ART.gh_id,
+                ART.process_times
+            FROM {self.article_video} ART
+            JOIN (
+                select content_id, count(1) as cnt 
+                from {self.article_crawler_video}
+                where download_status = 2
+                group by content_id
+            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
+            WHERE ART.content_status = 0 and ART.process_times <= 3
+            ORDER BY request_timestamp
+            LIMIT {self.history_coroutines};
+        """
+        tasks = await self.mysql_client.asyncSelect(sql=select_sql1)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "flow_pool_level": item[2],
+                "gh_id": item[3],
+                "process_times": item[4]
+            } for item in tasks
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def getVideoList(self, content_id):
+        """
+        content_id
+        :return:
+        """
+        sql = f"""
+        SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
+        FROM {self.article_crawler_video}
+        WHERE content_id = '{content_id}' and download_status = 2;
+        """
+        res_tuple = await self.mysql_client.asyncSelect(sql)
+        if len(res_tuple) >= 3:
+            return [
+                {
+                    "platform": i[0],
+                    "play_count": i[1],
+                    "like_count": i[2],
+                    "video_oss_path": i[3],
+                    "cover_oss_path": i[4],
+                    "uid": i[5]
+                }
+                for i in res_tuple]
+        else:
+            return []
+
+    async def getKimiTitle(self, content_id):
+        """
+        获取 kimiTitle
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+        select kimi_title from {self.article_text} where content_id = '{content_id}';
+        """
+        res_tuple = await self.mysql_client.asyncSelect(select_sql)
+        if res_tuple:
+            return res_tuple[0][0]
+        else:
+            return False
+
+    async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
+        """
+        发布至 pq
+        :param process_times:
+        :param trace_id:
+        :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
+        :param gh_id: 公众号 id ---> str
+        :param kimi_title: kimi 标题 ---> str
+        :param flow_pool_level: 流量池层级 ---> str
+        :return:
+        """
+        video_list = download_videos[:3]
+        match flow_pool_level:
+            case "autoArticlePoolLevel4":
+                print("冷启层")
+                video_list = []
+            case "autoArticlePoolLevel3":
+                print("暂时未知层")
+                video_list = []
+            case "autoArticlePoolLevel2":
+                print("次条层")
+                video_list = []
+            case "autoArticlePoolLevel1":
+                print("头条层")
+                video_list = []
+        L = []
+        for video_obj in video_list:
+            params = {
+                "videoPath": video_obj['video_oss_path'],
+                "uid": video_obj['uid'],
+                "title": kimi_title
+            }
+            response = await publishToPQ(params)
+            # time.sleep(2)
+            obj = {
+                "uid": video_obj['uid'],
+                "source": video_obj['platform'],
+                "kimiTitle": kimi_title,
+                "videoId": response['data']['id'],
+                "videoCover": response['data']['shareImgPath'],
+                "videoPath": response['data']['videoPath'],
+                "videoOss": video_obj['video_oss_path'].split("/")[-1]
+            }
+            L.append(obj)
+        update_sql = f"""
+           UPDATE {self.article_video}
+           SET content_status = %s, response = %s, process_times = %s
+           WHERE trace_id = %s;
+           """
+        await self.mysql_client.asyncInsert(
+            sql=update_sql,
+            params=(2, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
+        )
+
+    async def processTask(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        flow_pool_level = params['flow_pool_level'],
+        gh_id = params['gh_id']
+        process_times = params['process_times']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        download_videos = await self.getVideoList(content_id=content_id)
+        if download_videos:
+            # 把状态修改为 4
+            update_sql = f"""
+            UPDATE {self.article_video}
+            SET content_status = %s 
+            WHERE trace_id = %s;
+            """
+            await self.mysql_client.asyncInsert(
+                sql=update_sql,
+                params=(4, trace_id)
+            )
+
+            kimi_title = await self.getKimiTitle(content_id)
+            if kimi_title:
+                await self.publishVideosToPq(
+                    flow_pool_level=flow_pool_level,
+                    kimi_title=kimi_title,
+                    gh_id=gh_id,
+                    trace_id=trace_id,
+                    download_videos=download_videos,
+                    process_times=process_times
+                )
+            else:
+                print("Kimi title 生成失败---后续加报警")
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.getTaskList()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 98 - 0
tasks/kimi_task.py

@@ -0,0 +1,98 @@
+"""
+@author: luojunhui
+"""
+import json
+import asyncio
+from applications.functions.kimi import KimiServer
+from applications.functions.log import logging
+from applications.config import Config
+
+
+class KimiTask(object):
+    """
+    KIMI task
+    """
+
+    def __init__(self, mysql_client):
+        """
+
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+        self.config = Config()
+
+    async def getTasks(self):
+        """
+        获取 tasks
+        :return:
+        """
+        sql = f"""
+        SELECT content_id, article_title, article_text
+        FROM {self.config.articleText}
+        WHERE kimi_status = 0
+        limit 5;
+        """
+        content_list = await self.mysql_client.asyncSelect(sql)
+        if content_list:
+            task_list = [
+                {
+                    "contentId": i[0],
+                    "articleTitle": i[1],
+                    "articleText": i[2]
+                } for i in content_list
+            ]
+            return task_list
+        else:
+            return []
+
+    async def processTask(self, params):
+        """
+        do something
+        :return:
+        """
+        K = KimiServer()
+        try:
+            kimi_info = await K.search_kimi_schedule(params=params)
+            kimi_title = kimi_info['k_title']
+            content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
+            content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
+            update_kimi_sql = f"""
+            UPDATE {self.config.articleText} 
+            SET
+                kimi_title = %s,
+                kimi_summary = %s,
+                kimi_keys = %s,
+                kimi_status = %s
+            WHERE content_id = %s;
+                                    """
+            await self.mysql_client.asyncInsert(
+                sql=update_kimi_sql,
+                params=(kimi_title, content_title, content_keys, 1, params['contentId'])
+            )
+        except Exception as e:
+            update_kimi_sql = f"""
+            UPDATE {self.config.articleText}
+            SET
+                kimi_status = %s 
+            WHERE content_id = %s
+            """
+            await self.mysql_client.asyncInsert(
+                sql=update_kimi_sql,
+                params=(2, params['contentId'])
+            )
+            print("kimi error--{}".format(e))
+
+    async def deal(self):
+        """
+        deal function
+        :return:
+        """
+        task_list = await self.getTasks()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的 kimi 任务"
+            )

+ 186 - 0
tasks/publish_task.py

@@ -0,0 +1,186 @@
+"""
+@author: luojunhui
+发布到 pq 获取视频 id
+"""
+import asyncio
+import json
+import time
+
+from applications.config import Config
+from applications.functions.log import logging
+from applications.functions.pqFunctions import publishToPQ
+
+
+class publishTask(object):
+    """
+    在 match_videos 表中, 获取 content_status = 1 的 content_id
+    用  content_id 在 crawler_videos 表, 查询 download_status为 2 的视频,表示该 content_id 已经匹配完的视频
+    通过 流量池tag  逻辑
+    把 crawler_videos 中的视频路径发布至 pq, 获得 videoId
+    match_videos表将 content_status 修改为 2,response中记录 videoId && ossName等信息
+    """
+
+    def __init__(self, mysql_client):
+        self.mysql_client = mysql_client
+        self.article_video = Config().articleVideos
+        self.article_text = Config().articleText
+        self.article_crawler_video = Config().articleCrawlerVideos
+
+    async def getTasks(self):
+        """
+        获取 task
+        :return:
+        """
+        select_sql = f"""
+        SELECT trace_id, content_id, flow_pool_level, gh_id
+        FROM {self.article_video} 
+        WHERE content_status = 1
+        limit 10;
+        """
+        tasks = await self.mysql_client.asyncSelect(select_sql)
+        if tasks:
+            return [
+                {
+                    "trace_id": i[0],
+                    "content_id": i[1],
+                    "flow_pool_level": i[2],
+                    "gh_id": i[3]
+                }
+                for i in tasks
+            ]
+        else:
+            return []
+
+    async def getVideoList(self, content_id):
+        """
+        content_id
+        :return:
+        """
+        sql = f"""
+        SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
+        FROM {self.article_crawler_video}
+        WHERE content_id = '{content_id}' and download_status = 2;
+        """
+        res_tuple = await self.mysql_client.asyncSelect(sql)
+        if len(res_tuple) >= 3:
+            return [
+                {
+                    "platform": i[0],
+                    "play_count": i[1],
+                    "like_count": i[2],
+                    "video_oss_path": i[3],
+                    "cover_oss_path": i[4],
+                    "uid": i[5]
+                }
+                for i in res_tuple]
+        else:
+            return []
+
+    async def getKimiTitle(self, content_id):
+        """
+        获取 kimiTitle
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+        select kimi_title from {self.article_text} where content_id = '{content_id}';
+        """
+        res_tuple = await self.mysql_client.asyncSelect(select_sql)
+        if res_tuple:
+            return res_tuple[0][0]
+        else:
+            return False
+
+    async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
+        """
+        发布至 pq
+        :param trace_id:
+        :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
+        :param gh_id: 公众号 id ---> str
+        :param kimi_title: kimi 标题 ---> str
+        :param flow_pool_level: 流量池层级 ---> str
+        :return:
+        """
+        video_list = download_videos[:3]
+        match flow_pool_level:
+            case "autoArticlePoolLevel4":
+                print("冷启层")
+                video_list = []
+            case "autoArticlePoolLevel3":
+                print("暂时未知层")
+                video_list = []
+            case "autoArticlePoolLevel2":
+                print("次条层")
+                video_list = []
+            case "autoArticlePoolLevel1":
+                print("头条层")
+                video_list = []
+        L = []
+        for video_obj in video_list:
+            params = {
+                "videoPath": video_obj['video_oss_path'],
+                "uid": video_obj['uid'],
+                "title": kimi_title
+            }
+            response = await publishToPQ(params)
+            print(response)
+            time.sleep(2)
+            obj = {
+                "uid": video_obj['uid'],
+                "source": video_obj['platform'],
+                "kimiTitle": kimi_title,
+                "videoId": response['data']['id'],
+                "videoCover": response['data']['shareImgPath'],
+                "videoPath": response['data']['videoPath'],
+                "videoOss": video_obj['video_oss_path'].split("/")[-1]
+            }
+            L.append(obj)
+        update_sql = f"""
+        UPDATE {self.article_video}
+        SET content_status = %s, response = %s
+        WHERE trace_id = %s;
+        """
+        await self.mysql_client.asyncInsert(
+            sql=update_sql,
+            params=(2, json.dumps(L, ensure_ascii=False), trace_id)
+        )
+
+    async def processTask(self, params):
+        """
+        处理任务
+        :return:
+        """
+        gh_id = params['gh_id']
+        flow_pool_level = params['flow_pool_level']
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        download_videos = await self.getVideoList(content_id)
+        if download_videos:
+            kimi_title = await self.getKimiTitle(content_id)
+            if kimi_title:
+                await self.publishVideosToPq(
+                    flow_pool_level=flow_pool_level,
+                    kimi_title=kimi_title,
+                    gh_id=gh_id,
+                    trace_id=trace_id,
+                    download_videos=download_videos
+                )
+            else:
+                print("Kimi title 生成失败---后续加报警")
+        else:
+            print("该 content_id还未下载完成")
+
+    async def deal(self):
+        """
+        function
+        :return:
+        """
+        task_list = await self.getTasks()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 244 - 0
tasks/spider_task.py

@@ -0,0 +1,244 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+import json
+
+from applications.config import Config
+from applications.functions.log import logging
+from applications.spider import searchVideos
+
+
+class spiderTask(object):
+    """
+    定时执行任务
+    """
+    C = Config()
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+        self.article_video = self.C.articleVideos
+        self.article_text = self.C.articleText
+        self.article_video_crawler = self.C.articleCrawlerVideos
+        self.spider_coroutines = self.C.getConfigValue("spiderCoroutines")
+        self.gh_id_map = json.loads(self.C.getConfigValue("accountMap"))
+
+    async def getTask(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql = f"""
+            SELECT trace_id, content_id, gh_id, process_times
+            FROM {self.article_video}
+            WHERE content_status = 0 and process_times <= 3
+            GROUP BY content_id
+            LIMIT {self.spider_coroutines};
+        """
+        content_id_tuple = await self.mysql_client.asyncSelect(select_sql)
+        if content_id_tuple:
+            content_id_list = [i for i in list(content_id_tuple)]
+            task_obj_list = [
+                {
+                    "trace_id": item[0],
+                    "content_id": item[1],
+                    "gh_id": item[2],
+                    "process_times": item[3]
+                } for item in content_id_list
+            ]
+            logging(
+                code="9001",
+                info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+                data=task_obj_list
+            )
+            return task_obj_list
+        else:
+            return []
+
+    async def getHistoryVideos(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT count(1)
+            FROM {self.article_video_crawler}
+            where content_id = '{content_id}' and download_status = 2;
+        """
+        content_videos = await self.mysql_client.asyncSelect(select_sql)
+        videos_count = content_videos[0][0]
+        if videos_count >= 3:
+            return True
+        else:
+            return False
+
+    async def judgeContentProcessing(self, content_id):
+        """
+        判断该content_id是否在处理中
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+                       SELECT trace_id, content_status
+                       FROM {self.article_video}
+                       WHERE content_id = '{content_id}'
+                       ORDER BY id DESC;
+                   """
+        result = await self.mysql_client.asyncSelect(select_sql)
+        if result:
+            for item in result:
+                trace_id, content_status = item
+                if content_status == 1:
+                    return False
+            return True
+        else:
+            return True
+
+    async def getKimiResult(self, content_id):
+        """
+        通过 content_id 获取kimi info
+        :return:
+        """
+        select_sql = f"""
+        select article_title, kimi_title, kimi_summary, kimi_keys, kimi_status
+        from {self.article_text}
+        where content_id = '{content_id}';
+        """
+        response = await self.mysql_client.asyncSelect(select_sql)
+        if response:
+            article_detail = response[0]
+            if article_detail[4] == 1:
+                result = {
+                    "oriTitle": article_detail[0],
+                    "kimiTitle": article_detail[1],
+                    "kimiSummary": article_detail[2],
+                    "kimiKeys": json.loads(article_detail[3]),
+                    "kimiStatus": article_detail[4]
+                }
+            else:
+                result = {
+                    "kimiStatus": article_detail[4]
+                }
+            return result
+        else:
+            return
+
+    async def startProcess(self, params):
+        """
+        开始处理
+        :param params:
+        :return:
+        """
+        # 更新文章contentId为1, 说明该文章正在处理中
+        kimi_result = await self.getKimiResult(content_id=params['content_id'])
+        kimi_status = kimi_result['kimiStatus']
+        match kimi_status:
+            case 1:
+                update_process_times_sql = f"""
+                            UPDATE {self.article_video}
+                            SET process_times = %s, content_status = %s
+                            WHERE trace_id = %s;
+                            """
+                await self.mysql_client.asyncInsert(
+                    sql=update_process_times_sql,
+                    params=(
+                        params['process_times'] + 1,
+                        1,
+                        params['trace_id']
+                    )
+                )
+                try:
+                    await searchVideos(
+                        info={
+                            "oriTitle": kimi_result['oriTitle'],
+                            "kimiSummary": kimi_result['kimiSummary'],
+                            "kimiKeys": kimi_result['kimiKeys'],
+                            "traceId": params['trace_id'],
+                            "ghId": params['gh_id'],
+                            "contentId": params['content_id'],
+                            "spider": self.article_video_crawler
+                        },
+                        ghIdMap=self.gh_id_map,
+                        dbClient=self.mysql_client
+                    )
+                except Exception as e:
+                    roll_back_status = f"""
+                    UPDATE {self.article_video}
+                    SET content_status = %s
+                    WHERE trace_id = %s;
+                    """
+                    await self.mysql_client.asyncInsert(
+                        sql=roll_back_status,
+                        params=(
+                            0,
+                            params['trace_id']
+                        )
+                    )
+                    print("处理失败,回退状态为 0")
+            case 2:
+                update_process_times_sql = f"""
+                            UPDATE {self.article_video}
+                            SET process_times = %s, content_status = %s
+                            WHERE trace_id = %s;
+                            """
+                await self.mysql_client.asyncInsert(
+                    sql=update_process_times_sql,
+                    params=(
+                        params['process_times'] + 1,
+                        3,
+                        params['trace_id']
+                    )
+                )
+            case 0:
+                print("kimi not ready")
+
+    async def processTask(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        video_id_list = await self.getHistoryVideos(content_id=content_id)
+        if video_id_list:
+            # 说明已经存在了结果, 将该条记录下的video_id拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+        else:
+            flag = await self.judgeContentProcessing(content_id)
+            if flag:
+                logging(
+                    code="9004",
+                    info="无正在处理的文章ID, 开始处理",
+                    trace_id=trace_id
+                )
+                await self.startProcess(params=params)
+            else:
+                logging(
+                    code="9003",
+                    info="该文章ID正在请求--文章ID {}".format(content_id),
+                    trace_id=trace_id
+                )
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.getTask()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="爬虫池没有要处理的请求"
+            )

+ 0 - 347
tasks/task1.py

@@ -1,347 +0,0 @@
-"""
-@author: luojunhui
-"""
-import asyncio
-
-from static.config import db_article, db_video
-from applications.schedule import search_videos
-from applications.functions.log import logging
-from static.config import spider_coroutines
-
-
-class MatchTask1(object):
-    """
-    定时执行任务
-    """
-
-    def __init__(self, mysql_client):
-        """
-        :param mysql_client:
-        """
-        self.mysql_client = mysql_client
-
-    async def get_task(self):
-        """
-        获取任务
-        :return:
-        """
-        select_sql1 = f"""
-            SELECT DISTINCT (content_id)       
-            FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 3
-            ORDER BY request_time_stamp
-            ASC
-            LIMIT {spider_coroutines};
-        """
-        content_ids = await self.mysql_client.async_select(select_sql1)
-        cil = []
-        for content_id in content_ids:
-            cil.append(content_id[0])
-        content_ids_tuple = str(cil).replace("[", "(").replace("]", ")")
-        if len(content_ids_tuple) > 3:
-            select_sql = f"""
-                SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
-                FROM {db_article} 
-                WHERE content_id in {content_ids_tuple} and process_times <= 3 
-                ORDER BY request_time_stamp
-                ASC;
-            """
-            task_list = await self.mysql_client.async_select(sql=select_sql)
-            task_obj_list = [
-                {
-                    "trace_id": item[0],
-                    "content_id": item[1],
-                    "gh_id": item[2],
-                    "title": item[3],
-                    "text": item[4],
-                    "content_status": item[5],
-                    "process_times": item[6]
-                } for item in task_list
-            ]
-            logging(
-                code="9001",
-                info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
-                data=task_obj_list
-            )
-            return task_obj_list
-        else:
-            return []
-
-    async def get_history_videos(self, content_id):
-        """
-        check whether the contents videos exists
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-            SELECT video_id
-            FROM {db_video}
-            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
-        """
-        content_videos = await self.mysql_client.async_select(select_sql)
-        videos = [vid for vid in content_videos]
-        if len(videos) >= 3:
-            return videos
-        else:
-            return None
-
-    async def judge_content_processing(self, content_id):
-        """
-        判断该content_id是否在处理中
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-                       SELECT trace_id, content_status
-                       FROM {db_article}
-                       WHERE content_id = '{content_id}'
-                       ORDER BY id DESC;
-                   """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status == 1:
-                    return False
-            return True
-        else:
-            return True
-
-    async def use_exists_contents_videos(self, video_id_list, params):
-        """
-        使用已经存在的视频id
-        :return:
-        """
-        trace_id = params['trace_id']
-        content_id = params['content_id']
-        select_sql = f"""
-            SELECT kimi_title
-            FROM {db_article}
-            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
-        """
-        info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title = info[0]
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                kimi_title=%s,
-                recall_video_id1=%s, 
-                recall_video_id2=%s, 
-                recall_video_id3=%s,
-                content_status=%s,
-                process_times = %s
-            WHERE  trace_id = %s
-        """
-        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                kimi_title,
-                video_id_list[0],
-                "NULL" if vid2 is None else vid2,
-                "NULL" if vid3 is None else vid3,
-                2,
-                int(params['process_times']) + 1,
-                trace_id
-            )
-        )
-        logging(
-            code="9002",
-            info="已从历史文章更新,文章id: {}".format(content_id),
-            trace_id=trace_id
-        )
-
-    async def start_process(self, params):
-        """
-        开始处理
-        :param params:
-        :return:
-        """
-        # 更新文章contentId为1, 说明该文章正在处理中
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                content_status = %s
-            WHERE 
-                trace_id = %s;
-        """
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                1, params['trace_id']
-            )
-        )
-        try:
-            video_count = await search_videos(
-                params={
-                    "title": params['title'],
-                    "content": params['text'],
-                    "trace_id": params['trace_id'],
-                    "content_id": params['content_id']
-                },
-                trace_id=params['trace_id'],
-                gh_id=params['gh_id'],
-                mysql_client=self.mysql_client
-            )
-            select_sql = f"""
-                SELECT video_id
-                FROM {db_video}
-                WHERE content_id = '{params['content_id']}'
-            """
-            result = await self.mysql_client.async_select(sql=select_sql)
-            vid1, vid2, vid3 = result[0], result[1], result[2]
-            if vid1 or vid2 or vid3:
-                update_sql2 = f"""
-                    UPDATE {db_article}
-                    SET
-                        recall_video_id1 = %s,
-                        recall_video_id2 = %s,
-                        recall_video_id3 = %s,
-                        content_status = %s,
-                        process_times = %s
-                        WHERE trace_id = %s;
-                """
-                await self.mysql_client.async_insert(
-                    sql=update_sql2,
-                    params=(
-                        vid1 if vid1 else "NULL",
-                        vid2 if vid2 else "NULL",
-                        vid3 if vid3 else "NULL",
-                        2,
-                        {int(params['process_times']) + 1},
-                        params['trace_id']
-                    )
-                )
-                logging(
-                    code="9008",
-                    info="视频搜索成功, 状态修改为2",
-                    trace_id=params['trace_id']
-                )
-            else:
-                if int(params['process_times']) < 3:
-                    update_sql3 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                        WHERE trace_id = %s;
-                                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql3,
-                        params=(0, int(params['process_times']) + 1, params['trace_id'])
-                    )
-                    logging(
-                        code="9018",
-                        info="视频搜索失败,回退状态为0",
-                        trace_id=params['trace_id']
-                    )
-                else:
-                    update_sql3 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                        WHERE trace_id = %s;
-                                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql3,
-                        params=(3, int(params['process_times']) + 1, params['trace_id'])
-                    )
-                    logging(
-                        code="9019",
-                        info="视频多次搜索失败,状态修改为3",
-                        trace_id=params['trace_id']
-                    )
-        except Exception as e:
-            if int(params['process_times']) < 3:
-                logging(
-                    code="9018",
-                    info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
-                    trace_id=params['trace_id']
-                )
-                update_sql4 = f"""
-                    UPDATE {db_article}
-                    SET
-                       content_status = %s,
-                       process_times = %s
-                    WHERE trace_id = %s;
-                """
-                await self.mysql_client.async_insert(
-                    sql=update_sql4,
-                    params=(0, int(params['process_times']) + 1, params['trace_id'])
-                )
-            else:
-                logging(
-                    code="9019",
-                    info="{}异常错误:{}, 状态修改为3".format(params['trace_id'], e),
-                    trace_id=params['trace_id']
-                )
-                update_sql4 = f"""
-                                    UPDATE {db_article}
-                                    SET
-                                       content_status = %s,
-                                       process_times = %s
-                                    WHERE trace_id = %s;
-                                """
-                await self.mysql_client.async_insert(
-                    sql=update_sql4,
-                    params=(3, int(params['process_times']) + 1, params['trace_id'])
-                )
-
-    async def process_task(self, params):
-        """
-        异步执行
-        :param params:
-        :return:
-        """
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
-        video_id_list = await self.get_history_videos(content_id=content_id)
-        if video_id_list:
-            # 说明已经存在了结果, 将该条记录下的video_id拿出来
-            logging(
-                code="9001",
-                info="存在历史文章",
-                trace_id=trace_id
-            )
-            # await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
-        else:
-            flag = await self.judge_content_processing(content_id)
-            if flag:
-                logging(
-                    code="9004",
-                    info="无正在处理的文章ID, 开始处理",
-                    trace_id=trace_id
-                )
-                await self.start_process(params=params)
-            else:
-                logging(
-                    code="9003",
-                    info="该文章ID正在请求--文章ID {}".format(content_id),
-                    trace_id=trace_id
-                )
-
-    async def deal(self):
-        """
-        处理
-        :return:
-        """
-        task_list = await self.get_task()
-        task_dict = {}
-        for task in task_list:
-            key = task['content_id']
-            task_dict[key] = task
-        process_list = []
-        for item in task_dict:
-            process_list.append(task_dict[item])
-        if process_list:
-            # for task in task_list:
-            #     await self.process_task(task)
-            tasks = [self.process_task(params) for params in process_list]
-            await asyncio.gather(*tasks)
-        else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )

+ 132 - 0
test_code/SPIDER/baidu.py

@@ -0,0 +1,132 @@
+"""
+@author: luojunhui
+好看视频搜索爬虫
+"""
+import json
+import time
+import base64
+import hashlib
+import requests
+import urllib.parse
+from uuid import uuid4
+from fake_useragent import FakeUserAgent
+
+from applications.functions.common import sensitive_flag
+
+
+def tunnel_proxies():
+    """
+    快代理
+    :return:
+    """
+    # 隧道域名:端口号
+    tunnel = "l901.kdltps.com:15818"
+
+    # 用户名密码方式
+    username = "t11983523373311"
+    password = "mtuhdr2z"
+    proxies = {
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
+    }
+    return proxies
+
+
+def get_video_detail(video_id):
+    """
+    获取好看视频的视频链接
+    :param video_id:
+    :return:
+    """
+    url = "https://haokan.baidu.com/v"
+    params = {
+        'vid': video_id,
+        '_format': 'json'
+    }
+
+    base_64_string = base64.b64encode(str(uuid4()).encode()).decode()
+    headers = {
+        'Accept': '*/*',
+        'cookie': "BIDUPSID={}".format(base_64_string),
+        'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Content-Type': 'application/x-www-form-urlencoded',
+        'Referer': 'https://haokan.baidu.com',
+        'User-Agent': FakeUserAgent().chrome,
+    }
+    response = requests.request(
+        "GET",
+        url,
+        headers=headers,
+        params=params,
+        proxies=tunnel_proxies()
+    ).json()
+    time.sleep(2)
+    return response['data']['apiData']['curVideoMeta']
+
+
+def hksp_search(key, sensitive_words, trace_id):
+    """
+    好看视频搜索爬虫
+    """
+    timestamp_seconds = time.time()
+    timestamp_milliseconds = int(timestamp_seconds * 1000)
+    url = 'https://haokan.baidu.com/haokan/ui-search/pc/search/video'
+    # 定义请求的参数
+    strings = "{}_{}_{}_{}_{}".format(1, urllib.parse.quote(key), 10, timestamp_milliseconds, 1)
+    sign = hashlib.md5(strings.encode()).hexdigest()
+    params = {
+        'pn': 1,
+        'rn': 10,
+        'type': 'video',
+        'query': key,
+        'sign': sign,
+        'version': 1,
+        'timestamp': timestamp_milliseconds
+    }
+    # 定义请求头
+    base_64_string = base64.b64encode(str(uuid4()).encode()).decode()
+    headers = {
+        'authority': 'haokan.baidu.com',
+        'accept': '*/*',
+        'accept-language': 'zh,en;q=0.9,zh-CN;q=0.8',
+        'cookie': "BIDUPSID={}".format(base_64_string),
+        'user-agent': FakeUserAgent().chrome,
+        'x-requested-with': 'xmlhttprequest',
+    }
+    # 发送GET请求
+    try:
+        response = requests.get(
+            url,
+            headers=headers,
+            params=params,
+            proxies=tunnel_proxies(),
+            timeout=120
+        ).json()
+        data_list = response['data']['list']
+        L = []
+        for data in data_list:
+            try:
+                video_id = data['vid']
+                title = data['title']
+                duration = int(data['duration'].split(":")[0]) * 60 + int(data['duration'].split(":")[1])
+                if sensitive_flag(sensitive_words, title) and int(duration) <= 300:
+                    res = get_video_detail(video_id)
+                    L.append(res)
+                    return L
+                else:
+                    continue
+            except Exception as e:
+                pass
+
+        return L
+    except Exception as e:
+        print(e)
+        return []
+
+
+if __name__ == '__main__':
+    res = hksp_search("90岁上海大爷征婚", sensitive_words=[], trace_id="testId")
+    for item in res:
+        print(json.dumps(item, ensure_ascii=False, indent=4))