Browse Source

Merge branch '2024-0808-NewMatchPro1'

merge in 0905
罗俊辉 7 months ago
parent
commit
a39f3fe118

+ 5 - 6
applications/deal/__init__.py

@@ -1,9 +1,8 @@
 """
 @author: luojunhui
+接口信息
 """
-from .search_deal import SearchDeal
-from .re_search_deal import ReSearchDeal
-from .process_deal import ProcessDeal
-from .search_deal_v2 import SearchDeal2
-from .get_done_content_deal import ProcessDeal2
-from .recall_deal import RecallDeal
+from .record import Record
+from .response import Response
+from .minigram import Minigram
+from .getOffVideos import GetOffVideos

+ 338 - 0
applications/deal/finalResponse.py

@@ -0,0 +1,338 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import random
+import hashlib
+
+import uuid
+import urllib.parse
+
+from applications.functions.common import async_post
+from applications.functions.log import logging
+from static.config import db_article
+
+
+async def publishToPQ(video_obj):
+    """
+    publish video to pq
+    :return:
+    """
+    oss_path = video_obj['videoPath']
+    uid = video_obj['uid']
+    title = video_obj['title']
+    cover = video_obj['coverPath']
+    url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+    headers = {
+        "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+        "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+        "referer": "http://appspeed.piaoquantv.com",
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "accept-language": "zh-CN,zh-Hans;q=0.9",
+        "Content-Type": "application/x-www-form-urlencoded",
+    }
+    payload = {
+        "coverImgPath": cover,
+        "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+        "fileExtensions": "MP4",
+        "loginUid": uid,
+        "networkType": "Wi-Fi",
+        "platform": "iOS",
+        "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+        "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "title": title,
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "uid": uid,
+        "versionCode": "486",
+        "versionName": "3.4.12",
+        "videoFromScene": "1",
+        "videoPath": oss_path,
+        "viewStatus": "1",
+    }
+    response = await async_post(url, headers, payload)
+    print(response)
+    return response
+
+
+async def getPQVideoDetail(video_id):
+    """
+    获取票圈视频详情信息
+    :return:
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {
+        "videoIdList": [video_id]
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = await async_post(url, header, json.dumps(data))
+    print(response)
+    return response
+
+
+class FinalResponse(object):
+    """
+    视频发布至pq并且获取video_id
+    """
+
+    def __init__(self, params, mysql_client):
+        """
+        :param params:
+        :param mysql_client:
+        :return:
+        """
+        self.kimi_title = None
+        self.video_obj_list = None
+        self.mini_type = None
+        self.gh_id = None
+        self.trace_id = None
+        self.params = params
+        self.mysql_client = mysql_client
+        self.mini_map = {
+            25: {
+                "avatar": "https://rescdn.yishihui.com/0temp/ttmhzfsh.png",
+                "id": "wx0b7d95eb293b783b",
+                "name": "天天美好祝福生活",
+                "index": 25,
+            },
+            29: {
+                "avatar": "https://rescdn.yishihui.com/0temp/cyfyld.png",
+                "id": "wx65c76bb4c67934db",
+                "name": "财运福运来到",
+                "index": 29,
+            },
+            31: {
+                "avatar": "https://rescdn.yishihui.com/0temp/mhzfshxf2.png",
+                "id": "wx2e4478b1641b3b15",
+                "name": "美好祝福生活幸福",
+                "index": 31,
+            },
+            36: {
+                "avatar": "https://rescdn.yishihui.com/0temp/zfyfyc.jpeg",
+                "id": "wxcddf231abd0dabdc",
+                "name": "祝福有福有财",
+                "index": 36,
+            },
+            27: {
+                "avatar": "https://rescdn.yishihui.com/0temp/xymhfqdd.png",
+                "id": "wx7187c217efef24a7",
+                "name": "幸运美好福气多多",
+                "index": 27,
+            },
+            33: {
+                "avatar": "https://rescdn.yishihui.com/0temp/ssnnfqd.jpeg",
+                "id": "wx5e67713277549b6f",
+                "name": "年年岁岁福气多",
+                "index": 33,
+            },
+        }
+
+    def checkParams(self):
+        """
+        校验参数
+        :return:
+        """
+        try:
+            self.trace_id = self.params['traceId']
+            self.kimi_title = self.params['kimiTitle']
+            self.gh_id = self.params['ghId']
+            self.mini_type = self.params['miniType']
+            self.video_obj_list = self.params['videoObjList']
+            return None
+        except Exception as e:
+            response = {
+                "traceId": self.trace_id,
+                "error": str(e),
+                "info": "参数校验失败"
+            }
+            return response
+
+    def createGzhPath(self, video_id, shared_uid, gh_id):
+        """
+        :param gh_id: 公众号账号的gh_id
+        :param video_id: 视频 id
+        :param shared_uid: 分享 id
+        """
+
+        def generate_source_id():
+            """
+            generate_source_id
+            :return:
+            """
+            timestamp = str(int(time.time() * 1000))
+            random_str = str(random.randint(1000, 9999))
+            hash_input = f"{timestamp}-{random_str}"
+            return hashlib.md5(hash_input.encode()).hexdigest()
+
+        root_share_id = str(uuid.uuid4())
+        if self.mini_type == 2:
+            source_id = (
+                    "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
+            )
+        elif self.mini_type == 1:
+            source_id = "longArticles_" + generate_source_id()
+        elif self.mini_type == 3:
+            source_id = "WeCom_" + generate_source_id()
+        else:
+            source_id = "Error mini_program_type {}".format(self.mini_type)
+        url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
+        # 自动把 root_share_id 加入到白名单
+        # auto_white(root_share_id)
+        return (
+            root_share_id,
+            source_id,
+            f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
+        )
+
+    def chooseMiniProgram(self, mini_id):
+        """
+        获取小程序分享卡片
+        :return:
+        """
+        mini_info = self.mini_map[mini_id]
+        return (
+            mini_info["avatar"],
+            mini_info["id"],
+            mini_info["name"],
+            mini_info["index"],
+        )
+
+    async def generateSingleCard(self, video_id, title, index, gh_id, mini_id):
+        """
+        生成单个分享卡片
+        :return:
+        """
+        response = await getPQVideoDetail(video_id)
+        print(response)
+        productionCover = response["data"][0]["shareImgPath"]
+        videoUrl = response["data"][0]["videoPath"]
+        user_id = response["data"][0]["user"]["uid"]
+        programAvatar, programId, programName, pqMiniId = self.chooseMiniProgram(mini_id)
+        root_share_id, source_id, productionPath = self.createGzhPath(video_id, user_id, gh_id)
+        logging(
+            code="1002",
+            info="root_share_id --{}, productionPath -- {}".format(
+                root_share_id, productionPath
+            ),
+            function="process",
+            trace_id=self.trace_id,
+        )
+        result = {
+            "productionCover": productionCover,
+            "productionName": title,
+            "programAvatar": programAvatar,
+            "programId": programId,
+            "programName": programName,
+            "source": "web",
+            "rootShareId": root_share_id,
+            "productionPath": productionPath,
+            "videoUrl": videoUrl,
+            "mini_id": mini_id,
+            "paragraphPosition": index * 0.25,
+        }
+        update_result_sql = f"""
+                            UPDATE {db_article}
+                            SET
+                                result{index} = %s,
+                                success = %s
+                            WHERE
+                                trace_id = %s;
+                        """
+        await self.mysql_client.async_insert(
+            sql=update_result_sql,
+            params=(json.dumps(result, ensure_ascii=False), 1, self.trace_id),
+        )
+        logging(
+            code="2000",
+            info="统计 root_share_id && video_id",
+            function="process",
+            trace_id=self.trace_id,
+            data={
+                "rootShareId": root_share_id,
+                "videoId": video_id,
+                "sourceId": source_id,
+            },
+        )
+        return result
+
+    async def generateCards(self, video_list, kimi_title, gh_id):
+        """
+        生成一组卡片
+        :param video_list:
+        :param kimi_title:
+        :param gh_id:
+        :return:
+        """
+        L = []
+        if self.mini_type == 1:
+            mini_id_list = [25, 29, 31]
+            video_count = len(video_list)
+            mini_choice_index = random.sample(range(3), video_count)
+            mini_choice_item = [mini_id_list[i] for i in mini_choice_index]
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generateSingleCard(
+                    video_id=video_id,
+                    title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=mini_choice_item[index - 1],
+                )
+                L.append(result)
+        elif self.mini_type == 2:
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generateSingleCard(
+                    video_id=video_id,
+                    title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=33,
+                )
+                L.append(result)
+        elif self.mini_type == 3:
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generateSingleCard(
+                    video_id=video_id,
+                    title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=27,
+                )
+                L.append(result)
+        return L
+
+    async def getVideoIdList(self):
+        """
+        获取匹配文章的id_list
+        :return:
+        """
+        vid_list = []
+        for video_obj in self.video_obj_list:
+            video_publish_info = await publishToPQ(video_obj=video_obj)
+            video_id = video_publish_info['data']['id']
+            vid_list.append(video_id)
+        return vid_list
+
+    async def deal(self):
+        """
+        deal function
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            video_list = await self.getVideoIdList()
+            card_list = await self.generateCards(
+                video_list, self.kimi_title, self.gh_id
+            )
+            response = {
+                "trace_id": self.trace_id,
+                "miniprogramList": card_list
+            }
+            return response
+
+
+

+ 75 - 0
applications/deal/getOffVideos.py

@@ -0,0 +1,75 @@
+"""
+@author: luojunhui
+"""
+import time
+
+from static.config import db_article
+
+
+class GetOffVideos(object):
+    """
+    下架视频
+    """
+
+    def __init__(self, params, mysql_client):
+        self.params = params
+        self.mysql_client = mysql_client
+        self.trace_id = None
+
+    def checkParams(self):
+        """
+
+        :return:
+        """
+        try:
+            self.trace_id = self.params['traceId']
+            return None
+        except Exception as e:
+            response = {
+                "error": "params error",
+                "info": str(e),
+                "data": self.params
+            }
+            return response
+
+    async def pushVideoIntoQueue(self):
+        """
+        将视频id记录到待下架表中
+        :return:
+        """
+        select_sql = f"""
+        select recall_video_id1, recall_video_id2, recall_video_id3 from {db_article}
+        where trace_id = '{self.trace_id}';
+        """
+        recall_video_info = await self.mysql_client.async_select(sql=select_sql)
+        recall_vid_tuple = recall_video_info[0]
+        for vid in recall_vid_tuple:
+            try:
+                update_sql = f"""
+                INSERT INTO get_off_videos
+                (video_id, publish_time, video_status, trace_id)
+                values 
+                (%s, %s, %s, %s);
+                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql,
+                    params=(vid, int(time.time()), 1, self.trace_id)
+                )
+            except Exception as e:
+                print(e)
+
+    async def deal(self):
+        """
+
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            await self.pushVideoIntoQueue()
+            response = {
+                "status": "success",
+                "traceId": self.trace_id
+            }
+            return response

+ 0 - 285
applications/deal/get_done_content_deal.py

@@ -1,285 +0,0 @@
-"""
-@author: luojunhui
-"""
-import asyncio
-
-from applications.static.config import db_article
-from applications.schedule import search_videos
-from applications.functions.log import logging
-from applications.static.config import mysql_coroutines
-
-
-
-class ProcessDeal2(object):
-    """
-    定时执行任务
-    """
-
-    def __init__(self, mysql_client):
-        """
-        :param mysql_client:
-        """
-        self.mysql_client = mysql_client
-
-    async def get_task(self):
-        """
-        获取任务
-        :return:
-        """
-        select_sql = f"""
-            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
-            FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 5
-            ORDER BY request_time_stamp
-            ASC
-            LIMIT {mysql_coroutines};
-        """
-        task_list = await self.mysql_client.async_select(sql=select_sql)
-        task_obj_list = [
-            {
-                "trace_id": item[0],
-                "content_id": item[1],
-                "gh_id": item[2],
-                "title": item[3],
-                "text": item[4],
-                "content_status": item[5],
-                "process_times": item[6]
-            } for item in task_list
-        ]
-        logging(
-            code="9001",
-            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
-            data=task_obj_list
-        )
-        return task_obj_list
-
-    async def get_history_contents(self, content_id):
-        """
-        check whether the content id exists
-        :return: trace_id or None
-        """
-        select_sql = f"""
-               SELECT trace_id, content_status
-               FROM {db_article}
-               WHERE content_id = '{content_id}'
-               ORDER BY id DESC;
-           """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status == 2:
-                    return trace_id
-                elif content_status == 3:
-                    update_sql = f"""
-                    UPDATE {db_article}
-                    SET content_status = 3
-                    WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(update_sql, trace_id)
-                else:
-                    continue
-            return None
-        else:
-            return None
-
-    async def insert_history_contents_videos(self, history_trace_id, params):
-        """
-        插入历史视频id
-        :return:
-        """
-        select_sql = f"""
-            SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
-            FROM {db_article}
-            WHERE trace_id = '{history_trace_id}';
-        """
-        info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title, vid1, vid2, vid3 = info[0]
-        update_sql = f"""
-        UPDATE {db_article}
-        SET 
-            kimi_title=%s,
-            recall_video_id1=%s, 
-            recall_video_id2=%s, 
-            recall_video_id3=%s,
-            content_status=%s,
-            process_times = %s
-        WHERE  trace_id = %s;
-        """
-        await self.mysql_client.async_insert(
-            update_sql,
-            params=(
-                kimi_title,
-                vid1,
-                {"NULL" if vid2 is None else vid2},
-                {"NULL" if vid2 is None else vid3},
-                2,
-                int(params['process_times']) + 1,
-                params['trace_id']
-            )
-        )
-        logging(
-            code="9002",
-            info="已从历史文章更新,历史id: {}".format(history_trace_id),
-            trace_id=params['trace_id']
-        )
-
-    async def process_video_id(self, title, trace_id, process_times):
-        """
-        如果video_id在标题中,则做特殊处理
-        :return:
-        """
-        video_id = title.split("video_id=")[-1]
-        update_sql = f"""
-            UPDATE  
-                {db_article}
-            SET 
-                recall_video_id1 = %s,
-                content_status = %s,
-                process_times = %s
-            WHERE  
-                trace_id = %s;"""
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(video_id, 2, int(process_times) + 1, trace_id)
-        )
-
-    async def start_process(self, params):
-        """
-        开始处理
-        :param params:
-        :return:
-        """
-        # 更新文章contentId为1, 说明该文章正在处理中
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                content_status = %s
-            WHERE 
-                trace_id = %s;
-        """
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(1, params['trace_id'])
-        )
-        try:
-            # 判断标题中是否包含video_id
-            if "video_id=" in params['title']:
-                logging(
-                    code="9006",
-                    info="视频生成文本测试",
-                    trace_id=params['trace_id']
-                )
-                await self.process_video_id(
-                    title=params['title'],
-                    trace_id=params['trace_id'],
-                    process_times=params['process_times']
-                )
-            else:
-                await search_videos(
-                    params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
-                    trace_id=params['trace_id'],
-                    gh_id=params['gh_id'],
-                    mysql_client=self.mysql_client
-                )
-                # 执行完成之后,判断是否存在视频id
-                select_sql = f"""
-                    SELECT recall_video_id1, recall_video_id2, recall_video_id3
-                    FROM {db_article}
-                    WHERE trace_id = '{params["trace_id"]}';
-                """
-                result = await self.mysql_client.async_select(sql=select_sql)
-                vid1, vid2, vid3 = result[0]
-                if vid1:
-                    update_sql2 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                           WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql2,
-                        params=(2, int(params['process_times']) + 1, params['trace_id'])
-                    )
-                    logging(
-                        code="9008",
-                        info="视频搜索成功, 状态修改为2",
-                        trace_id=params['trace_id']
-                    )
-                else:
-                    update_sql3 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                        WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql3,
-                        params=(0, int(params['process_times']) + 1, params["trace_id"])
-                    )
-                    logging(
-                        code="9018",
-                        info="视频搜索失败,回退状态为0",
-                        trace_id=params['trace_id']
-                    )
-        except Exception as e:
-            logging(
-                code="9018",
-                info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
-                trace_id=params['trace_id']
-            )
-            update_sql4 = f"""
-                UPDATE {db_article}
-                SET 
-                   content_status = %s,
-                   process_times = %s
-                WHERE trace_id = %s;
-            """
-            await self.mysql_client.async_insert(
-                sql=update_sql4,
-                params=(0, int(params['process_times']) + 1, params["trace_id"])
-            )
-
-    async def process_task(self, params):
-        """
-        异步执行
-        :param params:
-        :return:
-        """
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        # 判断该文章是否已经生成了
-        history_trace_id = await self.get_history_contents(content_id)
-        if history_trace_id:
-            # 说明已经存在了结果, 将该条记录下的video_id拿出来
-            logging(
-                code="9001",
-                info="存在历史文章",
-                trace_id=trace_id,
-                function="find_history_article"
-            )
-            await self.insert_history_contents_videos(history_trace_id, params)
-        else:
-            logging(
-                code="9003",
-                info="未找到历史文章",
-                trace_id=trace_id,
-                function="find_history_article"
-            )
-
-    async def deal(self):
-        """
-        处理
-        :return:
-        """
-        task_list = await self.get_task()
-        if task_list:
-            tasks = [self.process_task(params) for params in task_list]
-            await asyncio.gather(*tasks)
-        else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )

+ 110 - 0
applications/deal/minigram.py

@@ -0,0 +1,110 @@
+"""
+@author: luojunhui
+"""
+minigram_map = {
+    1: {
+        # 25: {
+        #     "avatar": "https://rescdn.yishihui.com/0temp/ttmhzfsh.png",
+        #     "id": "wx0b7d95eb293b783b",
+        #     "name": "天天美好祝福生活",
+        #     "index": 25
+        # },
+        25: {
+                "avatar": "https://rescdn.yishihui.com/0temp/pqsp.png",
+                "id": "wxbdd2a2e93d9a6e25",
+                "name": "票圈视频",
+                "index": 25
+            },
+        29: {
+            "avatar": "https://rescdn.yishihui.com/0temp/cyfyld.png",
+            "id": "wx65c76bb4c67934db",
+            "name": "财运福运来到",
+            "index": 29
+        },
+        31: {
+            "avatar": "https://rescdn.yishihui.com/0temp/mhzfshxf2.png",
+            "id": "wx2e4478b1641b3b15",
+            "name": "美好祝福生活幸福",
+            "index": 31
+        }
+    },
+    2: {
+        36: {
+            "avatar": "https://rescdn.yishihui.com/0temp/zfyfyc.jpeg",
+            "id": "wxcddf231abd0dabdc",
+            "name": "祝福有福有财",
+            "index": 36
+        },
+        33: {
+            "avatar": "https://rescdn.yishihui.com/0temp/pqsp.png",
+            "id": "wxbdd2a2e93d9a6e25",
+            "name": "票圈视频",
+            "index": 33
+            }
+    },
+    3: {
+        27: {
+            "avatar": "https://rescdn.yishihui.com/0temp/xymhfqdd.png",
+            "id": "wx7187c217efef24a7",
+            "name": "幸运美好福气多多",
+            "index": 27
+        }
+    }
+}
+
+
+class Minigram(object):
+    """
+    小程序卡片
+    """
+
+    def __init__(self, params):
+        self.params = params
+        self.business_type = None
+        self.mini_code = None
+        self.trace_id = None
+
+    def check_params(self):
+        """
+        校验参数
+        :return:
+        """
+        try:
+            self.business_type = self.params['businessType']
+            self.mini_code = self.params['miniCode']
+            self.trace_id = self.params['traceId']
+            return None
+        except Exception as e:
+            response = {
+                "status": "fail",
+                "code": 1,
+                "message": str(e),
+                "info": "params check error"
+            }
+            return response
+
+    def choose_minigram(self):
+        """
+        分配小程序卡片
+        :return:
+        """
+        try:
+            minigram = minigram_map.get(self.business_type).get(self.mini_code)
+            response = {
+                "programAvatar": minigram['avatar'],
+                "programId": minigram['id'],
+                "programName": minigram['name'],
+                "trace_id": self.trace_id
+            }
+        except Exception as e:
+            response = {
+                "error": "invalid params",
+                "msg": str(e)
+            }
+        return response
+
+    async def deal(self):
+        """
+        :return:
+        """
+        return self.check_params() if self.check_params() else self.choose_minigram()

+ 0 - 79
applications/deal/re_search_deal.py

@@ -1,79 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import asyncio
-
-from applications.schedule import re_search_videos
-
-
-class ReSearchDeal(object):
-    """
-    重新搜索逻辑
-    obj = {
-        "ori_title": params['title'],
-        "content_title": params['kimi_summary'],
-        "content_keys": params['kimi_keys'],
-        "trace_id": params['trace_id']
-    }
-    """
-
-    def __init__(self, params):
-        self.gh_id = None
-        self.trace_id = None
-        self.kimi_keys = None
-        self.kimi_summary = None
-        self.title = None
-        self.params = params
-
-    def check_params(self):
-        """
-        check params
-        :return:
-        """
-        try:
-            self.title = self.params['title']
-            self.kimi_summary = self.params['kimi_summary']
-            self.kimi_keys = json.loads(self.params['kimi_keys'])
-            self.trace_id = self.params['trace_id']
-            self.gh_id = self.params['ghId']
-            return None
-        except AttributeError as e:
-            result = {
-                "status": "fail",
-                "code": 1,
-                "message": str(e),
-                "info": "params check error"
-            }
-            return result
-        
-    async def deal(self):
-        """
-        research function deal
-        :return: 
-        """
-        params_error = self.check_params()
-        if params_error:
-            return params_error
-        else:
-            try:
-                asyncio.ensure_future(
-                    re_search_videos(
-                        self.params,
-                        self.trace_id,
-                        self.gh_id)
-                )
-                res = {
-                    "status": "success",
-                    "code": 0,
-                    "traceId": self.trace_id
-                }
-                return res
-            except Exception as e:
-                res = {
-                    "status": "fail",
-                    "code": 1,
-                    "message": str(e)
-                }
-                return res
-            

+ 0 - 224
applications/deal/recall_deal.py

@@ -1,224 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import uuid
-import time
-import random
-import hashlib
-import urllib.parse
-
-from applications.functions.log import logging
-from applications.static.config import db_article
-from applications.functions.common import request_for_info
-
-
-class RecallDeal(object):
-    """
-    召回逻辑
-    """
-
-    def __init__(self, trace_id, mysql_client, mini_program_type):
-        self.trace_id = trace_id
-        self.mysql_client = mysql_client
-        self.mini_program_type = mini_program_type
-
-    async def get_result(self):
-        """
-        获取结果
-        :return:
-        """
-        select_sql = f"""
-                SELECT gh_id, recall_video_id1, recall_video_id2, recall_video_id3, kimi_title, content_status, process_times
-                FROM {db_article}
-                WHERE trace_id = '{self.trace_id}';
-        """
-        info_tuple = await self.mysql_client.async_select(select_sql)
-        gh_id, vid1, vid2, vid3, kimi_title, content_status, process_times = info_tuple[0]
-        response = {
-            "gh_id": gh_id,
-            "vid_list": [vid1, vid2, vid3],
-            "kimi_title": kimi_title,
-            "content_status": content_status,
-            "process_times": process_times
-        }
-        return response
-
-    def create_gzh_path(self, video_id, shared_uid, gh_id):
-        """
-        :param gh_id: 公众号账号的gh_id
-        :param video_id: 视频 id
-        :param shared_uid: 分享 id
-        """
-
-        def generate_source_id():
-            """
-            generate_source_id
-            :return:
-            """
-            timestamp = str(int(time.time() * 1000))
-            random_str = str(random.randint(1000, 9999))
-            hash_input = f"{timestamp}-{random_str}"
-            return hashlib.md5(hash_input.encode()).hexdigest()
-
-        root_share_id = str(uuid.uuid4())
-        if self.mini_program_type == 2:
-            source_id = "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
-        elif self.mini_program_type == 1:
-            source_id = "longArticles_" + generate_source_id()
-        elif self.mini_program_type == 3:
-            source_id = "WeCom_" + generate_source_id()
-        else:
-            source_id = "Error mini_program_type {}".format(self.mini_program_type)
-        url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
-        # 自动把 root_share_id 加入到白名单
-        # auto_white(root_share_id)
-        return root_share_id, source_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}"
-
-    def choose_mini_program(self):
-        """
-        获取小程序分享卡片
-        :return:
-        """
-        if self.mini_program_type == 1:
-            # 正常长文业务
-            programAvatar = "https://rescdn.yishihui.com/0temp/ssyqsh.png"
-            programId = "wx59d9e2c05f00f880"
-            programName = "刷刷有趣生活"
-        elif self.mini_program_type == 2:
-            # 投流业务
-            programAvatar = "https://rescdn.yishihui.com/0temp/zfyfyc.jpeg"
-            programId = "wxcddf231abd0dabdc"
-            programName = "祝福有福有财"
-        elif self.mini_program_type == 3:
-            # 企业微信
-            programAvatar = "https://rescdn.yishihui.com/0temp/xymhfqdd.png"
-            programId = "wx7187c217efef24a7"
-            programName = "幸运美好福气多多"
-        else:
-            programAvatar = "https://rescdn.yishihui.com/0temp/ssyqsh.png"
-            programId = "wx59d9e2c05f00f880"
-            programName = "刷刷有趣生活"
-        return programAvatar, programId, programName
-
-    async def generate_card(self, video_id, kimi_title, index, gh_id):
-        """
-        生成分享卡片
-        :return:
-        """
-        response = request_for_info(video_id)
-        productionCover = response['data'][0]['shareImgPath']
-        productionName = kimi_title
-        videoUrl = response['data'][0]['videoPath']
-        user_id = response['data'][0]['user']['uid']
-        programAvatar, programId, programName = self.choose_mini_program()
-        root_share_id, source_id, productionPath = self.create_gzh_path(video_id, user_id, gh_id)
-        source = "Web"
-        logging(
-            code="1002",
-            info="root_share_id --{}, productionPath -- {}".format(root_share_id, productionPath),
-            function="process",
-            trace_id=self.trace_id
-        )
-        result = {
-            "productionCover": productionCover,
-            "productionName": productionName,
-            "programAvatar": programAvatar,
-            "programId": programId,
-            "programName": programName,
-            "source": source,
-            "rootShareId": root_share_id,
-            "productionPath": productionPath,
-            "videoUrl": videoUrl,
-            "paragraphPosition": index * 0.25
-        }
-        update_result_sql = f"""
-                            UPDATE {db_article}
-                            SET
-                                result{index} = %s,
-                                success = %s
-                            WHERE
-                                trace_id = %s;
-                        """
-        await self.mysql_client.async_insert(
-            sql=update_result_sql,
-            params=(json.dumps(result, ensure_ascii=False), 1, self.trace_id)
-        )
-        logging(
-            code="2000",
-            info="统计 root_share_id && video_id",
-            function="process",
-            trace_id=self.trace_id,
-            data={
-                "rootShareId": root_share_id,
-                "videoId": video_id,
-                "sourceId": source_id
-            }
-        )
-        return result
-
-    async def deal(self):
-        """
-        Recall Deal
-        :return:
-        """
-        response = await self.get_result()
-        status_code = response.get("content_status")
-        process_times = response.get("process_times")
-        if status_code == 0:
-            if process_times > 5:
-                result = {
-                    "traceId": self.trace_id,
-                    "code": 0,
-                    "error": "匹配失败,处理超过五次,文章敏感"
-                }
-            else:
-                result = {
-                    "traceId": self.trace_id,
-                    "code": 0,
-                    "Message": "该请求还没处理"
-                }
-        elif status_code == 1:
-            result = {
-                "traceId": self.trace_id,
-                "code": 1,
-                "Message": "该请求正在处理中"
-            }
-        elif status_code == 2:
-            L = []
-            unEmptyList = [i for i in response['vid_list'] if i]
-            for index, best_video_id in enumerate(unEmptyList, 1):
-                card = await self.generate_card(best_video_id, response.get("kimi_title"), index, response['gh_id'])
-                L.append(card)
-            if L:
-                result = {
-                    "traceId": self.trace_id,
-                    "miniprogramList": L
-                }
-            else:
-                result = {
-                    "traceId": self.trace_id,
-                    "code": 0,
-                    "error": "匹配失败,视频下载失败返回vid为0"
-                }
-        elif status_code == 3:
-            result = {
-                "traceId": self.trace_id,
-                "code": 0,
-                "error": "匹配失败,处理超过五次,文章敏感"
-            }
-        else:
-            result = {
-                "traceId": self.trace_id,
-                "Message": "UnKnow Error"
-            }
-        logging(
-            code="1002",
-            info="返回结果",
-            function="process",
-            data=result,
-            trace_id=self.trace_id
-        )
-        return result
-
-

+ 2 - 2
applications/deal/search_deal_v2.py → applications/deal/record.py

@@ -6,10 +6,10 @@ import time
 from uuid import uuid4
 
 from applications.functions.log import logging
-from applications.static.config import db_article
+from static.config import db_article
 
 
-class SearchDeal2(object):
+class Record(object):
     """
     搜索接口处理逻辑
     """

+ 329 - 0
applications/deal/response.py

@@ -0,0 +1,329 @@
+"""
+@author: luojunhui
+"""
+
+import json
+import uuid
+import time
+import random
+import hashlib
+import urllib.parse
+
+from applications.functions.log import logging
+from static.config import db_article
+from applications.functions.common import request_for_info
+
+
+class Response(object):
+    """
+    召回逻辑
+    """
+
+    def __init__(self, trace_id, mysql_client, mini_program_type):
+        """
+        长文: 25, 29, 31
+        投流: 33
+        企微: 27
+        :param trace_id:
+        :param mysql_client:
+        :param mini_program_type:
+        """
+        self.trace_id = trace_id
+        self.mysql_client = mysql_client
+        self.mini_program_type = mini_program_type
+        self.mini_map = {
+            # 25: {
+            #     "avatar": "https://rescdn.yishihui.com/0temp/ttmhzfsh.png",
+            #     "id": "wx0b7d95eb293b783b",
+            #     "name": "天天美好祝福生活",
+            #     "index": 25,
+            # },
+            25: {
+                "avatar": "https://rescdn.yishihui.com/0temp/pqsp.png",
+                "id": "wxbdd2a2e93d9a6e25",
+                "name": "票圈视频",
+                "index": 25
+            },
+            29: {
+                "avatar": "https://rescdn.yishihui.com/0temp/cyfyld.png",
+                "id": "wx65c76bb4c67934db",
+                "name": "财运福运来到",
+                "index": 29,
+            },
+            31: {
+                "avatar": "https://rescdn.yishihui.com/0temp/mhzfshxf2.png",
+                "id": "wx2e4478b1641b3b15",
+                "name": "美好祝福生活幸福",
+                "index": 31,
+            },
+            36: {
+                "avatar": "https://rescdn.yishihui.com/0temp/zfyfyc.jpeg",
+                "id": "wxcddf231abd0dabdc",
+                "name": "祝福有福有财",
+                "index": 36,
+            },
+            27: {
+                "avatar": "https://rescdn.yishihui.com/0temp/xymhfqdd.png",
+                "id": "wx7187c217efef24a7",
+                "name": "幸运美好福气多多",
+                "index": 27,
+            },
+            # 33: {
+            #     "avatar": "https://rescdn.yishihui.com/0temp/ssnnfqd.jpeg",
+            #     "id": "wx5e67713277549b6f",
+            #     "name": "年年岁岁福气多",
+            #     "index": 33,
+            # },
+            33: {
+                "avatar": "https://rescdn.yishihui.com/0temp/pqsp.png",
+                "id": "wxbdd2a2e93d9a6e25",
+                "name": "票圈视频",
+                "index": 33
+            }
+        }
+
+    async def get_result(self):
+        """
+        获取结果
+        :return:
+        """
+        select_sql = f"""
+                SELECT gh_id, recall_video_id1, recall_video_id2, recall_video_id3, kimi_title, content_status, process_times
+                FROM {db_article}
+                WHERE trace_id = '{self.trace_id}';
+        """
+        info_tuple = await self.mysql_client.async_select(select_sql)
+        gh_id, vid1, vid2, vid3, kimi_title, content_status, process_times = info_tuple[
+            0
+        ]
+        response = {
+            "gh_id": gh_id,
+            "vid_list": [vid1, vid2, vid3],
+            "kimi_title": kimi_title,
+            "content_status": content_status,
+            "process_times": process_times,
+        }
+        print(response)
+        return response
+
+    def create_gzh_path(self, video_id, shared_uid, gh_id):
+        """
+        :param gh_id: 公众号账号的gh_id
+        :param video_id: 视频 id
+        :param shared_uid: 分享 id
+        """
+
+        def generate_source_id():
+            """
+            generate_source_id
+            :return:
+            """
+            timestamp = str(int(time.time() * 1000))
+            random_str = str(random.randint(1000, 9999))
+            hash_input = f"{timestamp}-{random_str}"
+            return hashlib.md5(hash_input.encode()).hexdigest()
+
+        root_share_id = str(uuid.uuid4())
+        if self.mini_program_type == 2:
+            source_id = (
+                "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
+            )
+        elif self.mini_program_type == 1:
+            source_id = "longArticles_" + generate_source_id()
+        elif self.mini_program_type == 3:
+            source_id = "WeCom_" + generate_source_id()
+        else:
+            source_id = "Error mini_program_type {}".format(self.mini_program_type)
+        url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
+        # 自动把 root_share_id 加入到白名单
+        # auto_white(root_share_id)
+        return (
+            root_share_id,
+            source_id,
+            f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
+        )
+
+    def choose_mini_program(self, mini_id):
+        """
+        获取小程序分享卡片
+        :return:
+        """
+        mini_info = self.mini_map[mini_id]
+        return (
+            mini_info["avatar"],
+            mini_info["id"],
+            mini_info["name"],
+            mini_info["index"],
+        )
+
+    async def generate_card(self, video_id, kimi_title, index, gh_id, mini_id):
+        """
+        生成分享卡片
+        :return:
+        """
+        response = request_for_info(video_id)
+        productionCover = response["data"][0]["shareImgPath"]
+        productionName = kimi_title
+        videoUrl = response["data"][0]["videoPath"]
+        user_id = response["data"][0]["user"]["uid"]
+        programAvatar, programId, programName, pqMiniId = self.choose_mini_program(
+            mini_id
+        )
+        root_share_id, source_id, productionPath = self.create_gzh_path(
+            video_id, user_id, gh_id
+        )
+        source = "Web"
+        logging(
+            code="1002",
+            info="root_share_id --{}, productionPath -- {}".format(
+                root_share_id, productionPath
+            ),
+            function="process",
+            trace_id=self.trace_id,
+        )
+        result = {
+            "productionCover": productionCover,
+            "productionName": productionName,
+            "programAvatar": programAvatar,
+            "programId": programId,
+            "programName": programName,
+            "source": source,
+            "rootShareId": root_share_id,
+            "productionPath": productionPath,
+            "videoUrl": videoUrl,
+            "mini_id": mini_id,
+            "paragraphPosition": index * 0.25,
+        }
+        update_result_sql = f"""
+                            UPDATE {db_article}
+                            SET
+                                result{index} = %s,
+                                success = %s
+                            WHERE
+                                trace_id = %s;
+                        """
+        await self.mysql_client.async_insert(
+            sql=update_result_sql,
+            params=(json.dumps(result, ensure_ascii=False), 1, self.trace_id),
+        )
+        logging(
+            code="2000",
+            info="统计 root_share_id && video_id",
+            function="process",
+            trace_id=self.trace_id,
+            data={
+                "rootShareId": root_share_id,
+                "videoId": video_id,
+                "sourceId": source_id,
+            },
+        )
+        return result
+
+    async def generate_cards(self, video_list, kimi_title, gh_id):
+        """
+        生成一组卡片
+        :param video_list:
+        :param kimi_title:
+        :param gh_id:
+        :return:
+        """
+        L = []
+        if self.mini_program_type == 1:
+            # mini_id_list = [25, 29, 31]
+            # video_count = len(video_list)
+            # mini_choice_index = random.sample(range(3), video_count)
+            # mini_choice_item = [mini_id_list[i] for i in mini_choice_index]
+            for index, video_id in enumerate(video_list, 1):
+                random_num = random.randint(1, 10)
+                if random_num in [1, 2, 3, 4, 5, 6]:
+                    mini_id = 25
+                elif random_num in [7, 8]:
+                    mini_id = 29
+                else:
+                    mini_id = 31
+                result = await self.generate_card(
+                    video_id=video_id,
+                    kimi_title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=mini_id,
+                )
+                L.append(result)
+        elif self.mini_program_type == 2:
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generate_card(
+                    video_id=video_id,
+                    kimi_title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=33,
+                )
+                L.append(result)
+        elif self.mini_program_type == 3:
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generate_card(
+                    video_id=video_id,
+                    kimi_title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=27,
+                )
+                L.append(result)
+        return L
+
+    async def deal(self):
+        """
+        Recall Deal
+        :return:
+        """
+        response = await self.get_result()
+        status_code = response.get("content_status")
+        process_times = response.get("process_times")
+        if status_code == 0:
+            if process_times > 3:
+                result = {
+                    "traceId": self.trace_id,
+                    "code": 0,
+                    "error": "匹配失败,处理超过五次,文章敏感",
+                }
+            else:
+                result = {
+                    "traceId": self.trace_id,
+                    "code": 0,
+                    "Message": "该请求还没处理",
+                }
+        elif status_code == 1:
+            result = {
+                "traceId": self.trace_id,
+                "code": 1,
+                "Message": "该请求正在处理中",
+            }
+        elif status_code == 2:
+            unEmptyList = [i for i in response["vid_list"] if i]
+            cards = await self.generate_cards(
+                unEmptyList, response.get("kimi_title"), response["gh_id"]
+            )
+            if cards:
+                result = {"traceId": self.trace_id, "miniprogramList": cards}
+            else:
+                result = {
+                    "traceId": self.trace_id,
+                    "code": 0,
+                    "error": "匹配失败,视频下载失败返回vid为0",
+                }
+        elif status_code == 3:
+            result = {
+                "traceId": self.trace_id,
+                "code": 0,
+                "error": "匹配失败,处理超过五次,文章敏感",
+            }
+        else:
+            result = {"traceId": self.trace_id, "Message": "UnKnow Error"}
+        logging(
+            code="1002",
+            info="返回结果",
+            function="process",
+            data=result,
+            trace_id=self.trace_id,
+        )
+        return result

+ 0 - 216
applications/deal/search_deal.py

@@ -1,216 +0,0 @@
-"""
-@author: luojunhui
-"""
-import time
-import asyncio
-
-from uuid import uuid4
-
-from applications.functions.log import logging
-from applications.static.config import db_article
-from applications.schedule import search_videos
-
-
-class SearchDeal(object):
-    """
-    搜索接口处理逻辑
-    """
-
-    def __init__(self, params, mysql_client):
-        self.content_id = None
-        self.account_name = None
-        self.contents = None
-        self.title = None
-        self.gh_id = None
-        self.params = params
-        self.mysql_client = mysql_client
-        self.trace_id = "search-{}-{}".format(str(uuid4()), str(int(time.time())))
-
-    def check_params(self):
-        """
-        检查请求params
-        :return:
-        """
-        try:
-            self.gh_id = self.params['ghId']
-            self.title = self.params['title'].split("@@")[-1].replace("'", "")
-            self.contents = self.params['content'].replace("'", "")
-            self.account_name = self.params['accountName'].replace("'", "")
-            self.content_id = self.params['articleId']
-            logging(
-                code="1001",
-                info="搜索视频内容接口请求成功, 参数校验成功",
-                port="title_to_search",
-                trace_id=self.trace_id,
-                data=self.params
-            )
-            return None
-        except Exception as e:
-            result = {
-                "status": "fail",
-                "code": 1,
-                "message": str(e),
-                "info": "params check error"
-            }
-            logging(
-                code="4001",
-                info="搜索视频内容接口请求成功, 参数校验失败",
-                port="title_to_search",
-                trace_id=self.trace_id,
-                data=self.params
-            )
-            return result
-
-    async def record(self):
-        """
-        把数据插入
-        :return:
-        """
-        insert_sql = f"""
-                        INSERT INTO {db_article}
-                            (trace_id, gh_id, article_title, article_text, account_name, content_id)
-                        VALUES 
-                            (%s, %s, %s, %s, %s, %s);
-                            """
-        await self.mysql_client.async_insert(
-            sql=insert_sql,
-            params=(self.trace_id, self.gh_id, self.title, self.contents, self.account_name, self.content_id)
-        )
-        logging(
-            code="1002",
-            info="成功记录请求数据到mysql中",
-            trace_id=self.trace_id
-        )
-
-    async def process_video_id(self):
-        """
-        如果video_id在标题中,则做特殊处理
-        :return:
-        """
-        video_id = self.title.split("video_id=")[-1]
-        update_sql = f"""
-            UPDATE  
-                {db_article}
-            SET 
-                recall_video_id1 = %s
-            WHERE  
-                trace_id = %s;"""
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(video_id, self.trace_id)
-        )
-        res = {
-            "status": "success",
-            "code": 0,
-            "traceId": self.trace_id
-        }
-        logging(
-            code="1003",
-            info="视频生成文本服务请求,video_id = {}".format(video_id),
-            trace_id=self.trace_id
-        )
-        return res
-
-    async def insert_history_contents_videos(self, vid1, vid2, vid3, kimi_title):
-        """
-        插入历史视频id
-        :return:
-        """
-        update_sql = f"""
-        UPDATE {db_article}
-        SET 
-            kimi_title=%s,
-            recall_video_id1=%s, 
-            recall_video_id2=%s, 
-            recall_video_id3=%s
-        WHERE  trace_id = %s
-        """
-        print(update_sql)
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                kimi_title,
-                vid1,
-                {"NULL" if vid2 is None else vid2},
-                {"NULL" if vid3 is None else vid3},
-                self.trace_id
-            )
-        )
-
-    async def get_history_contents(self):
-        """
-        check whether the content id exists
-        :return:
-
-        """
-        select_sql = f"""
-            SELECT recall_video_id1, recall_video_id2, recall_video_id3, kimi_title
-            FROM {db_article}
-            WHERE content_id = '{self.content_id}' and trace_id != '{self.trace_id}'
-            ORDER BY id DESC;
-        """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                video_1, video_2, video_3, kimi_title = item
-                if video_1 and kimi_title:
-                    return [video_1, video_2, video_3, kimi_title]
-                else:
-                    continue
-            return None
-        else:
-            return None
-
-    async def deal(self):
-        """
-        deal
-        :return:
-        """
-        params_error = self.check_params()
-        if params_error:
-            return params_error
-        else:
-            # 记录
-            await self.record()
-            if "video_id=" in self.title:
-                return await self.process_video_id()
-            else:
-                video_ids = await self.get_history_contents()
-                if video_ids:
-                    logging(
-                        code="1004",
-                        info="获取历史到文章视频",
-                        data=video_ids,
-                        trace_id=self.trace_id
-                    )
-                    await self.insert_history_contents_videos(
-                        video_ids[0],
-                        video_ids[1],
-                        video_ids[2],
-                        video_ids[3]
-                    )
-                    return {"status": "success", "code": 0, "traceId": self.trace_id}
-                else:
-                    # search from the Internet
-                    try:
-                        asyncio.ensure_future(
-                            search_videos(
-                                params={"title": self.title, "content": self.contents, "trace_id": self.trace_id},
-                                trace_id=self.trace_id,
-                                gh_id=self.gh_id,
-                                mysql_client=self.mysql_client
-                            )
-                        )
-                        res = {
-                            "status": "success",
-                            "code": 0,
-                            "traceId": self.trace_id
-                        }
-                        return res
-                    except Exception as e:
-                        res = {
-                            "status": "fail",
-                            "code": 1,
-                            "message": str(e)
-                        }
-                        return res

+ 20 - 0
applications/functions/common.py

@@ -231,3 +231,23 @@ async def request_etl(url, headers, json_data, retries=6):
                     await asyncio.sleep(2)  # 等待一段时间后重试
                 else:
                     raise
+
+
+async def async_post(url, headers, payload):
+    """
+    :param url:
+    :param headers:
+    :param payload:
+    :return:
+    """
+    retries = 3
+    async with aiohttp.ClientSession() as session:
+        for attempt in range(3):
+            try:
+                async with session.post(url, headers=headers, data=payload, timeout=10) as response:
+                    return await response.json()
+            except asyncio.TimeoutError:
+                if attempt < retries - 1:
+                    await asyncio.sleep(2)  # 等待一段时间后重试
+                else:
+                    raise

+ 90 - 0
applications/functions/pqFunctions.py

@@ -0,0 +1,90 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications.functions.common import async_post
+
+
+async def publishToPQ(video_obj):
+    """
+    publish video to pq
+    :return:
+    """
+    oss_path = video_obj['videoPath']
+    uid = video_obj['uid']
+    title = video_obj['title']
+    url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+    headers = {
+        "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+        "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+        "referer": "http://appspeed.piaoquantv.com",
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "accept-language": "zh-CN,zh-Hans;q=0.9",
+        "Content-Type": "application/x-www-form-urlencoded",
+    }
+    payload = {
+        "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+        "fileExtensions": "MP4",
+        "loginUid": uid,
+        "networkType": "Wi-Fi",
+        "platform": "iOS",
+        "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+        "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "title": title,
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "uid": uid,
+        "versionCode": "486",
+        "versionName": "3.4.12",
+        "videoFromScene": "1",
+        "videoPath": oss_path,
+        "viewStatus": "1",
+        "appType": 888880,
+        "repeatStatus": 1
+    }
+    # response = requests.post(
+    #     url=url,
+    #     headers=headers,
+    #     data=payload,
+    # )
+    # return response.json()
+    response = await async_post(url, headers, payload)
+    return response
+
+
+async def getPQVideoDetail(video_id):
+    """
+    获取票圈视频详情信息
+    :return:
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {
+        "videoIdList": [video_id]
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = await async_post(url, header, json.dumps(data))
+    return response
+
+
+async def getNewVideoIds(video_obj_list):
+    """
+    video
+    :return:
+    """
+    vid_list = []
+    for video_obj in video_obj_list:
+        # video_obj 里面的信息对于历史数据可能不全,需要从pq获取
+        print(json.dumps(video_obj, ensure_ascii=False, indent=4))
+        try:
+            if len(vid_list) >= 3:
+                return vid_list
+            else:
+                pq_response = await publishToPQ(video_obj)
+                video_id = pq_response['data']['id']
+                vid_list.append(video_id)
+        except:
+            continue
+    return vid_list

+ 21 - 9
applications/routes.py

@@ -3,7 +3,7 @@
 """
 from quart import Blueprint, jsonify, request
 
-from applications.deal import SearchDeal2, ProcessDeal, RecallDeal
+from applications.deal import Response, Record, Minigram, GetOffVideos
 
 my_blueprint = Blueprint('LongArticles', __name__)
 
@@ -28,7 +28,7 @@ def Routes(mysql_client):
         :return:
         """
         params = await request.get_json()
-        SD = SearchDeal2(params=params, mysql_client=mysql_client)
+        SD = Record(params=params, mysql_client=mysql_client)
         result = await SD.deal()
         return jsonify(result)
 
@@ -41,18 +41,30 @@ def Routes(mysql_client):
         data = await request.get_json()
         trace_id = data['traceId']
         minigram_type = data['miniprogramUseType']
-        RD = RecallDeal(trace_id=trace_id, mini_program_type=minigram_type, mysql_client=mysql_client)
+        RD = Response(trace_id=trace_id, mini_program_type=minigram_type, mysql_client=mysql_client)
         response = await RD.deal()
         return jsonify(response)
 
-    @my_blueprint.route("/task")
-    async def schedule_task():
+    @my_blueprint.route("/choose_minigram", methods=['POST'])
+    async def match_minigram():
         """
-        执行代码
+        获取小程序信息
         :return:
         """
-        PD = ProcessDeal(mysql_client=mysql_client)
-        await PD.deal()
-        return jsonify({"success": "true"})
+        data = await request.get_json()
+        M = Minigram(params=data)
+        response = await M.deal()
+        return jsonify(response)
+
+    @my_blueprint.route("/get_off_videos", methods=['POST'])
+    async def get_off_videos():
+        """
+        自动下架视频记录
+        :return:
+        """
+        data = await request.get_json()
+        GOV = GetOffVideos(params=data, mysql_client=mysql_client)
+        response = await GOV.deal()
+        return jsonify(response)
 
     return my_blueprint

+ 1 - 1
applications/schedule/__init__.py

@@ -2,5 +2,5 @@
 @author: luojunhui
 """
 from .process_schedule import recall_videos
-from .search_schedule import search_videos, re_search_videos
+from .search_schedule import search_videos
 # from .process_schedule import return_info_v2

+ 1 - 1
applications/schedule/process_schedule.py

@@ -6,7 +6,7 @@ import json
 
 from applications.functions.common import request_for_info, create_gzh_path, create_gzh_path_v2
 from applications.functions.log import logging
-from applications.static.config import db_article
+from static.config import db_article
 
 
 async def return_info_v2(video_id, trace_id, mysql_client):

+ 20 - 70
applications/schedule/search_schedule.py

@@ -7,7 +7,7 @@ import time
 
 from applications.match_algorithm.rank import title_similarity_rank
 from applications.search import *
-from applications.static.config import gh_id_dict, db_article
+from static.config import gh_id_dict, db_article
 from applications.functions.log import logging
 from applications.functions.video_item import VideoProducer
 from applications.functions.kimi import KimiServer
@@ -234,11 +234,11 @@ class SearchMethod(object):
             return L
 
 
-async def video_sender(video_obj, user, trace_id, platform, index):
+async def video_sender(video_obj, user, trace_id, platform, content_id):
     """
     异步处理微信 video_obj
     公众号和站内账号一一对应
-    :param index:
+    :param content_id:
     :param platform:
     :param user:
     :param trace_id:
@@ -272,21 +272,23 @@ async def video_sender(video_obj, user, trace_id, platform, index):
         )
     else:
         mq_obj = {}
-    mq_obj['index'] = index
     mq_obj['trace_id'] = trace_id
+    mq_obj['content_id'] = content_id
     header = {
         "Content-Type": "application/json",
     }
-    await request_etl(
+    response = await request_etl(
         url="http://192.168.203.137:4612/etl",
         headers=header,
         json_data=mq_obj
     )
-    # await request_etl(
+    return response
+    # response = await request_etl(
     #     url="http://localhost:4612/etl",
     #     headers=header,
     #     json_data=mq_obj
     # )
+    # return response
 
 
 async def search_videos(params, trace_id, gh_id, mysql_client):
@@ -326,29 +328,27 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     )
     # 按照标题相似度排序
     ranked_list = title_similarity_rank(content_title=params['title'].split("@@")[-1], recall_list=recall_list)
-    # print(params['title'].split("@@")[-1])
-    for i in ranked_list:
-        print(i['title'], i['score'])
     index = 0
     for recall_obj in ranked_list:
         if recall_obj:
             platform = recall_obj['platform']
             recall_video = recall_obj['result']
             if recall_video:
-                index += 1
-                await video_sender(
+                response = await video_sender(
                     video_obj=recall_video,
                     user=gh_id_dict.get(gh_id),
                     trace_id=trace_id,
                     platform=platform,
-                    index=index
-                )
-                logging(
-                    code="1007",
-                    info="成功请求etl",
-                    data=recall_video,
-                    trace_id=trace_id
+                    content_id=params['content_id']
                 )
+                if response['status'] == "success":
+                    index += 1
+                    logging(
+                        code="1007",
+                        info="成功请求etl",
+                        data=recall_video,
+                        trace_id=trace_id
+                    )
                 if index >= 3:
                     print("already downloaded 3 videos")
                     logging(
@@ -356,56 +356,6 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
                         info="成功下载三条视频",
                         trace_id=trace_id
                     )
-                    break
-
-
-async def re_search_videos(params, trace_id, gh_id):
-    """
-    重新搜索接口
-    :param params:
-    :param trace_id:
-    :param gh_id:
-    :return:
-    """
-    try:
-        obj = {
-            "ori_title": params['title'],
-            "content_title": params['kimi_summary'],
-            "content_keys": json.loads(params['kimi_keys']),
-            "trace_id": params['trace_id']
-        }
-    except:
-        obj = {
-            "ori_title": params['title'],
-            "content_title": params['kimi_summary'],
-            "content_keys": params['kimi_keys'],
-            "trace_id": params['trace_id']
-        }
-    SearchAB = SearchABTest(info=obj, gh_id=gh_id)
-    # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
-    recall_list = await SearchAB.ab_5()
-    print("一共搜索到{}条视频".format(len(recall_list)))
-    index = 0
-    for recall_obj in recall_list:
-        if recall_obj:
-            platform = recall_obj['platform']
-            recall_video = recall_obj['result']
-            if recall_video:
-                index += 1
-                await video_sender(
-                    video_obj=recall_video,
-                    user=gh_id_dict.get(gh_id),
-                    trace_id=trace_id,
-                    platform=platform,
-                    index=index
-                )
-                logging(
-                    code="7004",
-                    info="成功请求etl",
-                    trace_id=trace_id
-                )
-                if index >= 3:
-                    print("already downloaded 3 videos")
-                    break
+                    return index
+    return index
 
-    print("一个匹配到{}条".format(index))

+ 0 - 74
applications/static/official_accounts

@@ -1,74 +0,0 @@
-gh_2b8c6aa035ae	魔法美学馆
-gh_9e559b3b94ca	票圈大事件
-gh_084a485e859a	生活情感叁读
-gh_1ee2e1b39ccf	票圈最新消息
-gh_4c058673c07e	探马再探再报
-gh_de9f9ebc976b	赵师傅厨房秘笈
-gh_058e41145a0c	小琪故事馆
-gh_7b4a5f86d68c	八卦不断线
-gh_538f78f9d3aa	张阿姨爱美食
-gh_fe6ef3a65a48	心灵智慧馆
-gh_484de412b0ef	充电宝宝
-gh_4568b5a7e2fe	王小八娱乐
-gh_adca24a8f429	兔子爱蹬鹰
-gh_e24da99dc899	缘来养心厅
-gh_e0eb490115f5	心灵情感驿站
-gh_d2cc901deca7	票圈极速版
-gh_45beb952dc74	票圈乐活
-gh_b8baac4296cb	票圈原创视频精选
-gh_26a307578776	票圈美文速递
-gh_183d80deffb8	生活良读
-gh_9cf3b7ff486b	票圈热门
-gh_b32125c73861	票圈奇闻
-gh_5ff48e9fb9ef	祝福养心厅
-gh_9161517e5676	宝娃趣味游戏
-gh_9f8dc5b0c74e	音药金曲厅
-gh_3ac6d7208961	异次元玩家
-gh_6d9f36e3a7be	音药养心馆
-gh_ac43e43b253b	小阳看天下
-gh_d5f935d0d1f2	半仙社评
-gh_171cec079b2a	观察家王小姐
-gh_be8c29139989	心灵书局
-gh_c91b42649690	心理调色盘
-gh_93e00e187787	小惠爱厨房
-gh_744cb16f6e16	美味在人间
-gh_9877c8541764	退休老年圈
-gh_0c89e11f8bf3	幸福启示
-gh_6d205db62f04	指尖奇文
-gh_7bca1c99aea0	慧行社
-gh_c69776baf2cd	老友欢聚地
-gh_234ef02cdee5	姜子丫
-gh_56a6765df869	婉央女子
-gh_e2576b7181c6	六八评价
-gh_40a0ad154478	所见畅谈
-gh_34318194fd0e	老新说事
-gh_901b0d722749	壹姐八卦
-gh_3c7d38636846	圈内侃八卦
-gh_01f8afd03366	奇闻有约
-gh_a307072c04b9	生活智慧正能量
-gh_424c8eeabced	爱姨生活妙招
-gh_671f460c856c	日日有妙招
-gh_b9b99173ff8a	实在妙招
-gh_e9d819f9e147	热血军中事
-gh_da76772d8d15	娱乐在前
-gh_bd57b6978e06	八点说故事
-gh_6b7c2a257263	幸福晚年知音
-gh_bfe5b705324a	奇趣百味生活
-gh_29074b51f2b7	老来生活家
-gh_0921c03402cd	俏生活秘籍
-gh_7e5818b2dd83	便捷生活好方法
-gh_89ef4798d3ea	生活百态观
-gh_bff0bcb0694a	喜乐生活派
-gh_a2901d34f75b	畅聊奇闻
-gh_b15de7c99912	人生百事观
-gh_73be0287bb94	军莫愁
-gh_56ca3dae948c	老友闲谈
-gh_a182cfc94dad	冀中轶事
-gh_a6351b447819	冀中精彩生活
-gh_3df10391639c	冀中生活谈
-gh_e75dbdc73d80	票圈正能量
-gh_5e543853d8f0	票圈精彩
-gh_f4594783f5b8	俏丽音乐相册
-gh_3845af6945d0	新品女装特价
-gh_b3ffc1ca3a04	票圈内容精选
-gh_efaf7da157f5	票圈热议

+ 0 - 3
dev/task_test.py

@@ -1,3 +0,0 @@
-"""
-@author: luojunhui
-"""

+ 84 - 0
matchVideoFromHistoryArticleASC.py

@@ -0,0 +1,84 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import aiomysql
+import asyncio
+
+from tasks import MatchTask3
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def async_select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql, params):
+        """
+        insert and update method
+        :param params:
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql, params)
+                await coon.commit()
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = MatchTask3(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待3分钟".format(now_str))
+        asyncio.sleep(3 * 60)

+ 84 - 0
matchVideoFromHistoryArticleDESC.py

@@ -0,0 +1,84 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import aiomysql
+import asyncio
+
+from tasks import MatchTask4
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def async_select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql, params):
+        """
+        insert and update method
+        :param params:
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql, params)
+                await coon.commit()
+
+
+async def main():
+    """
+    main2
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = MatchTask4(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待5s".format(now_str))
+        asyncio.sleep(5)

+ 6 - 22
task.py → match_video_task.py

@@ -6,9 +6,7 @@ import datetime
 import asyncio
 
 import aiomysql
-
-from applications.deal import ProcessDeal
-from applications.deal.get_done_content_deal import ProcessDeal2
+from tasks import MatchTask1
 
 
 class TaskMySQLClient(object):
@@ -58,6 +56,7 @@ class TaskMySQLClient(object):
     async def async_insert(self, sql, params):
         """
         insert and update method
+        :param params:
         :param sql:
         :return:
         """
@@ -74,29 +73,14 @@ async def main():
     """
     TMC = TaskMySQLClient()
     await TMC.init_pool()
-    PD = ProcessDeal(TMC)
-    await PD.deal()
-
-
-async def main2():
-    """
-    main2
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = ProcessDeal2(TMC)
+    PD = MatchTask1(TMC)
     await PD.deal()
 
 
 if __name__ == '__main__':
-    # asyncio.run(main())
     while True:
         asyncio.run(main())
         now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)
-        asyncio.run(main2())
-        now_str = datetime.datetime.now().__str__()
-        print("查找历史数据{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)
+        print("{}    请求执行完成, 等待10s".format(now_str))
+        time.sleep(10)
+

+ 0 - 0
applications/static/config.py → static/config.py


+ 0 - 0
applications/static/logo.png → static/logo.png


+ 3 - 0
static/official_accounts

@@ -0,0 +1,3 @@
+天天美好祝福生活
+wx0b7d95eb293b783b
+https://rescdn.yishihui.com/0temp/ttmhzfsh.png

+ 8 - 0
tasks/__init__.py

@@ -0,0 +1,8 @@
+"""
+@author: luojunhui
+定时任务
+"""
+from .task1 import MatchTask1
+from .task2 import MatchTask2
+from .task3 import MatchTask3
+from .task4 import MatchTask4

+ 150 - 0
tasks/chadui.py

@@ -0,0 +1,150 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from static.config import db_article, db_video
+from applications.functions.log import logging
+from static.config import mysql_coroutines
+
+
+class MatchTask5(object):
+    """
+    定时执行任务
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def get_task(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql = f"""
+            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
+            ORDER BY request_time_stamp
+            DESC
+            LIMIT {mysql_coroutines};
+        """
+        task_list = await self.mysql_client.async_select(sql=select_sql)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in task_list
+        ]
+        print("本次任务获取到 {} 条视频".format(len(task_obj_list)))
+        # logging(
+        #     code="9001",
+        #     info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+        #     data=task_obj_list
+        # )
+        return task_obj_list
+
+    async def get_history_videos(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        print(len(videos))
+        if len(videos) >= 3:
+            return videos
+        else:
+            return None
+
+    async def use_exists_contents_videos(self, video_id_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                video_id_list[0],
+                "NULL" if vid2 is None else vid2,
+                "NULL" if vid3 is None else vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def process_task(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        print(content_id)
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        print(video_id_list)
+        if video_id_list:
+            # 说明已经存在了结果, 将该条记录下的video_id拿出来
+            print("存在历史文章")
+            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.get_task()
+        print(len(task_list))
+        if task_list:
+            tasks = [self.process_task(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 111 - 140
applications/deal/process_deal.py → tasks/task1.py

@@ -3,13 +3,13 @@
 """
 import asyncio
 
-from applications.static.config import db_article
+from static.config import db_article, db_video
 from applications.schedule import search_videos
 from applications.functions.log import logging
-from applications.static.config import spider_coroutines
+from static.config import spider_coroutines
 
 
-class ProcessDeal(object):
+class MatchTask1(object):
     """
     定时执行任务
     """
@@ -28,7 +28,7 @@ class ProcessDeal(object):
         select_sql1 = f"""
             SELECT DISTINCT (content_id)       
             FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 5
+            WHERE content_status = 0 and process_times <= 3
             ORDER BY request_time_stamp
             ASC
             LIMIT {spider_coroutines};
@@ -42,11 +42,10 @@ class ProcessDeal(object):
             select_sql = f"""
                 SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
                 FROM {db_article} 
-                WHERE content_id in {content_ids_tuple} and process_times <= 5 
+                WHERE content_id in {content_ids_tuple} and process_times <= 3 
                 ORDER BY request_time_stamp
                 ASC;
             """
-            print(select_sql)
             task_list = await self.mysql_client.async_select(sql=select_sql)
             task_obj_list = [
                 {
@@ -68,26 +67,21 @@ class ProcessDeal(object):
         else:
             return []
 
-    async def get_history_contents(self, content_id):
+    async def get_history_videos(self, content_id):
         """
-        check whether the content id exists
-        :return: trace_id or None
+        check whether the contents videos exists
+        :param content_id:
+        :return:
         """
         select_sql = f"""
-               SELECT trace_id, content_status
-               FROM {db_article}
-               WHERE content_id = '{content_id}'
-               ORDER BY id DESC;
-           """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status == 2:
-                    return trace_id
-                else:
-                    continue
-            return None
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        if len(videos) >= 3:
+            return videos
         else:
             return None
 
@@ -113,70 +107,48 @@ class ProcessDeal(object):
         else:
             return True
 
-    async def insert_history_contents_videos(self, history_trace_id, params):
+    async def use_exists_contents_videos(self, video_id_list, params):
         """
-        插入历史视频id
+        使用已经存在的视频id
         :return:
         """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
         select_sql = f"""
-            SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
+            SELECT kimi_title
             FROM {db_article}
-            WHERE trace_id = '{history_trace_id}';
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
         """
         info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title, vid1, vid2, vid3 = info[0]
+        kimi_title = info[0]
         update_sql = f"""
-        UPDATE {db_article}
-        SET 
-            kimi_title=%s,
-            recall_video_id1=%s, 
-            recall_video_id2=%s, 
-            recall_video_id3=%s,
-            content_status=%s,
-            process_times = %s
-        WHERE  trace_id = %s
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
         """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
         await self.mysql_client.async_insert(
             sql=update_sql,
             params=(
                 kimi_title,
-                vid1,
+                video_id_list[0],
                 "NULL" if vid2 is None else vid2,
                 "NULL" if vid3 is None else vid3,
                 2,
                 int(params['process_times']) + 1,
-                params['trace_id']
+                trace_id
             )
         )
         logging(
             code="9002",
-            info="已从历史文章更新,历史id: {}".format(history_trace_id),
-            trace_id=params['trace_id']
-        )
-
-    async def process_video_id(self, title, trace_id, process_times):
-        """
-        如果video_id在标题中,则做特殊处理
-        :return:
-        """
-        video_id = title.split("video_id=")[-1]
-        update_sql = f"""
-            UPDATE  
-                {db_article}
-            SET 
-                recall_video_id1 = %s,
-                content_status = %s,
-                process_times = %s
-            WHERE  
-                trace_id = %s;"""
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                video_id,
-                2,
-                {int(process_times) + 1},
-                trace_id
-            )
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
         )
 
     async def start_process(self, params):
@@ -200,89 +172,88 @@ class ProcessDeal(object):
             )
         )
         try:
-            # 判断标题中是否包含video_id
-            if "video_id=" in params['title']:
+            video_count = await search_videos(
+                params={
+                    "title": params['title'],
+                    "content": params['text'],
+                    "trace_id": params['trace_id'],
+                    "content_id": params['content_id']
+                },
+                trace_id=params['trace_id'],
+                gh_id=params['gh_id'],
+                mysql_client=self.mysql_client
+            )
+            select_sql = f"""
+                SELECT video_id
+                FROM {db_video}
+                WHERE content_id = '{params['content_id']}'
+            """
+            result = await self.mysql_client.async_select(sql=select_sql)
+            vid1, vid2, vid3 = result[0], result[1], result[2]
+            if vid1 or vid2 or vid3:
+                update_sql2 = f"""
+                    UPDATE {db_article}
+                    SET
+                        recall_video_id1 = %s,
+                        recall_video_id2 = %s,
+                        recall_video_id3 = %s,
+                        content_status = %s,
+                        process_times = %s
+                        WHERE trace_id = %s;
+                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql2,
+                    params=(
+                        vid1 if vid1 else "NULL",
+                        vid2 if vid2 else "NULL",
+                        vid3 if vid3 else "NULL",
+                        2,
+                        {int(params['process_times']) + 1},
+                        params['trace_id']
+                    )
+                )
                 logging(
-                    code="9006",
-                    info="视频生成文本测试",
+                    code="9008",
+                    info="视频搜索成功, 状态修改为2",
                     trace_id=params['trace_id']
                 )
-                await self.process_video_id(
-                    title=params['title'],
-                    trace_id=params['trace_id'],
-                    process_times=params['process_times']
-                )
             else:
-                await search_videos(
-                    params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
-                    trace_id=params['trace_id'],
-                    gh_id=params['gh_id'],
-                    mysql_client=self.mysql_client
-                )
-                # 执行完成之后,判断是否存在视频id
-                select_sql = f"""
-                    SELECT recall_video_id1, recall_video_id2, recall_video_id3
-                    FROM {db_article}
-                    WHERE trace_id = '{params["trace_id"]}';
-                """
-                result = await self.mysql_client.async_select(sql=select_sql)
-                vid1, vid2, vid3 = result[0]
-                if vid1 or vid2 or vid3:
-                    update_sql2 = f"""
+                if int(params['process_times']) < 3:
+                    update_sql3 = f"""
                         UPDATE {db_article}
                         SET 
                            content_status = %s,
                            process_times = %s
-                           WHERE trace_id = %s;
-                    """
+                        WHERE trace_id = %s;
+                                    """
                     await self.mysql_client.async_insert(
-                        sql=update_sql2,
-                        params=(
-                            2, {int(params['process_times']) + 1}, params['trace_id']
-                        )
+                        sql=update_sql3,
+                        params=(0, int(params['process_times']) + 1, params['trace_id'])
                     )
                     logging(
-                        code="9008",
-                        info="视频搜索成功, 状态修改为2",
+                        code="9018",
+                        info="视频搜索失败,回退状态为0",
                         trace_id=params['trace_id']
                     )
                 else:
-                    if int(params['process_times']) < 5:
-                        update_sql3 = f"""
-                            UPDATE {db_article}
-                            SET 
-                               content_status = %s,
-                               process_times = %s
-                            WHERE trace_id = %s;
-                                        """
-                        await self.mysql_client.async_insert(
-                            sql=update_sql3,
-                            params=(0, int(params['process_times']) + 1, params['trace_id'])
-                        )
-                        logging(
-                            code="9018",
-                            info="视频搜索失败,回退状态为0",
-                            trace_id=params['trace_id']
-                        )
-                    else:
-                        update_sql3 = f"""
-                            UPDATE {db_article}
-                            SET 
-                               content_status = %s,
-                               process_times = %s
-                            WHERE trace_id = %s;
-                                        """
-                        await self.mysql_client.async_insert(
-                            sql=update_sql3,
-                            params=(3, int(params['process_times']) + 1, params['trace_id'])
-                        )
-                        logging(
-                            code="9019",
-                            info="视频多次搜索失败,状态修改为3",
-                            trace_id=params['trace_id']
-                        )
+                    update_sql3 = f"""
+                        UPDATE {db_article}
+                        SET 
+                           content_status = %s,
+                           process_times = %s
+                        WHERE trace_id = %s;
+                                    """
+                    await self.mysql_client.async_insert(
+                        sql=update_sql3,
+                        params=(3, int(params['process_times']) + 1, params['trace_id'])
+                    )
+                    logging(
+                        code="9019",
+                        info="视频多次搜索失败,状态修改为3",
+                        trace_id=params['trace_id']
+                    )
         except Exception as e:
-            if int(params['process_times']) < 5:
+            if int(params['process_times']) < 3:
                 logging(
                     code="9018",
                     info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
@@ -290,7 +261,7 @@ class ProcessDeal(object):
                 )
                 update_sql4 = f"""
                     UPDATE {db_article}
-                    SET 
+                    SET
                        content_status = %s,
                        process_times = %s
                     WHERE trace_id = %s;
@@ -307,7 +278,7 @@ class ProcessDeal(object):
                 )
                 update_sql4 = f"""
                                     UPDATE {db_article}
-                                    SET 
+                                    SET
                                        content_status = %s,
                                        process_times = %s
                                     WHERE trace_id = %s;
@@ -325,16 +296,16 @@ class ProcessDeal(object):
         """
         content_id = params['content_id']
         trace_id = params['trace_id']
-        # 判断该文章是否已经生成了
-        history_trace_id = await self.get_history_contents(content_id)
-        if history_trace_id:
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        if video_id_list:
             # 说明已经存在了结果, 将该条记录下的video_id拿出来
             logging(
                 code="9001",
                 info="存在历史文章",
                 trace_id=trace_id
             )
-            await self.insert_history_contents_videos(history_trace_id, params)
+            # await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
         else:
             flag = await self.judge_content_processing(content_id)
             if flag:

+ 170 - 0
tasks/task2.py

@@ -0,0 +1,170 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from static.config import db_article, db_video
+from applications.functions.log import logging
+from static.config import mysql_coroutines
+
+
+class MatchTask2(object):
+    """
+    定时执行任务
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def get_task(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql = f"""
+            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5
+            ORDER BY request_time_stamp
+            ASC
+            LIMIT {mysql_coroutines};
+        """
+        task_list = await self.mysql_client.async_select(sql=select_sql)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in task_list
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def get_history_videos(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        if len(videos) >= 3:
+            return videos
+        else:
+            return None
+
+    async def use_exists_contents_videos(self, video_id_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                video_id_list[0],
+                "NULL" if vid2 is None else vid2,
+                "NULL" if vid3 is None else vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def process_video_id(self, title, trace_id, process_times):
+        """
+        如果video_id在标题中,则做特殊处理
+        :return:
+        """
+        video_id = title.split("video_id=")[-1]
+        update_sql = f"""
+            UPDATE  
+                {db_article}
+            SET 
+                recall_video_id1 = %s,
+                content_status = %s,
+                process_times = %s
+            WHERE  
+                trace_id = %s;"""
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(video_id, 2, int(process_times) + 1, trace_id)
+        )
+
+    async def process_task(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        if video_id_list:
+            # 说明已经存在了结果, 将该条记录下的video_id拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.get_task()
+        if task_list:
+            tasks = [self.process_task(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )

+ 176 - 0
tasks/task3.py

@@ -0,0 +1,176 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from static.config import db_article, db_video, mysql_coroutines
+from applications.functions.log import logging
+from applications.functions.pqFunctions import *
+
+
+class MatchTask3(object):
+    """
+    处理已经匹配过小程序的文章
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def getTaskList(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT 
+                ART.trace_id, 
+                ART.content_id, 
+                ART.gh_id, 
+                ART.article_title, 
+                ART.article_text, 
+                ART.content_status, 
+                ART.process_times
+            FROM {db_article} ART
+            JOIN (
+                select content_id, count(1) as cnt 
+                from {db_video}
+                where oss_status = 1
+                group by content_id
+            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
+            WHERE ART.content_status = 0 and ART.process_times <= 3
+            ORDER BY request_time_stamp
+            LIMIT {mysql_coroutines};
+        """
+        tasks = await self.mysql_client.async_select(sql=select_sql1)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in tasks
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def getHistoryVideoOssPath(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_title, uid, video_path, cover_path
+            FROM {db_video}
+            where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        video_list = [
+            {
+                "title": line[0],
+                "uid": line[1],
+                "videoPath": line[2],
+                "coverPath": line[3]
+            }
+            for line in content_videos
+        ]
+        if len(video_list) >= 3:
+            return video_list
+        else:
+            return None
+
+    async def useExistOssPath(self, video_info_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        video_id_list = await getNewVideoIds(video_info_list)
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                vid2,
+                vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def processTask(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
+        if oss_path_list:
+            # 说明已经存在了结果, 将该条记录下的video_oss拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            try:
+                await self.useExistOssPath(video_info_list=oss_path_list, params=params)
+            except Exception as e:
+                print(e)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.getTaskList()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )
+

+ 177 - 0
tasks/task4.py

@@ -0,0 +1,177 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from static.config import db_article, db_video, mysql_coroutines
+from applications.functions.log import logging
+from applications.functions.pqFunctions import *
+
+
+class MatchTask4(object):
+    """
+    处理已经匹配过小程序的文章
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def getTaskList(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT 
+                ART.trace_id, 
+                ART.content_id, 
+                ART.gh_id, 
+                ART.article_title, 
+                ART.article_text, 
+                ART.content_status, 
+                ART.process_times
+            FROM {db_article} ART
+            JOIN (
+                select content_id, count(1) as cnt 
+                from {db_video}
+                where oss_status = 1
+                group by content_id
+            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
+            WHERE ART.content_status = 0 and ART.process_times <= 3
+            ORDER BY request_time_stamp
+            DESC
+            LIMIT {mysql_coroutines};
+        """
+        tasks = await self.mysql_client.async_select(sql=select_sql1)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in tasks
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def getHistoryVideoOssPath(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_title, uid, video_path, cover_path
+            FROM {db_video}
+            where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        video_list = [
+            {
+                "title": line[0],
+                "uid": line[1],
+                "videoPath": line[2],
+                "coverPath": line[3]
+            }
+            for line in content_videos
+        ]
+        if len(video_list) >= 3:
+            return video_list
+        else:
+            return None
+
+    async def useExistOssPath(self, video_info_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        video_id_list = await getNewVideoIds(video_info_list)
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                vid2,
+                vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def processTask(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
+        if oss_path_list:
+            # 说明已经存在了结果, 将该条记录下的video_oss拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            try:
+                await self.useExistOssPath(video_info_list=oss_path_list, params=params)
+            except Exception as e:
+                print(e)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.getTaskList()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )
+

+ 80 - 0
test_code/hurry.py

@@ -0,0 +1,80 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+
+import pymysql
+from tqdm import tqdm
+
+spider_connection = pymysql.connect(
+    host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
+    port=3306,
+    user="crawler",
+    passwd="crawler123456@",
+    db="piaoquan-crawler",
+    charset="utf8mb4"
+)
+
+with open("id.txt", encoding="utf-8") as f:
+    data = f.readlines()
+
+L = {}
+for line in data:
+    new_id = line.split(",")[1].strip()
+    old_id = line.split(",")[0]
+    L[new_id] = old_id
+print(len(L))
+
+with open("update_video_trace_id.json", encoding="utf-8") as f:
+    publish_data = json.loads(f.read())
+
+print(len(publish_data))
+
+
+def update_into_long_videos(trace_id, vid1, vid2, vid3):
+    """
+    更新video_id
+    :param trace_id:
+    :param vid1:
+    :param vid2:
+    :param vid3:
+    :return:
+    """
+    update_sql = f"""
+        UPDATE long_articles_video
+        set recall_video_id1 = %s, recall_video_id2 = %s, recall_video_id3 = %s
+        where trace_id = %s;
+    """
+    cursor = spider_connection.cursor()
+    cursor.execute(
+        update_sql,
+        (vid1, vid2, vid3, trace_id)
+    )
+    spider_connection.commit()
+
+
+ff = 0
+for item in tqdm(publish_data):
+    trace_id = item['trace_id']
+    data_info = item['result_data']
+    try:
+        vid_list = []
+        for video_obj in json.loads(data_info):
+            path = video_obj['productionPath']
+            video_id = path.split("id%3D")[1].split("%26su")[0]
+            # if L.get(video_id):
+            vid_list.append(video_id)
+        update_into_long_videos(
+            trace_id=trace_id,
+            vid1=vid_list[0],
+            vid2=vid_list[1],
+            vid3=vid_list[2]
+        )
+
+    except Exception as e:
+        print("No videos   {}".format(e))
+
+print(ff)
+
+