Quellcode durchsuchen

新的match逻辑

罗俊辉 vor 8 Monaten
Ursprung
Commit
3b0dfbd357

+ 3 - 1
applications/deal/__init__.py

@@ -4,4 +4,6 @@
 """
 """
 from .record import Record
 from .record import Record
 from .response import Response
 from .response import Response
-from .minigram import Minigram
+from .minigram import Minigram
+from .preResponse import PreResponse
+from .finalResponse import FinalResponse

+ 338 - 0
applications/deal/finalResponse.py

@@ -0,0 +1,338 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import random
+import hashlib
+
+import uuid
+import urllib.parse
+
+from applications.functions.common import async_post
+from applications.functions.log import logging
+from static.config import db_article
+
+
+async def publishToPQ(video_obj):
+    """
+    publish video to pq
+    :return:
+    """
+    oss_path = video_obj['videoPath']
+    uid = video_obj['uid']
+    title = video_obj['title']
+    cover = video_obj['coverPath']
+    url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+    headers = {
+        "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+        "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+        "referer": "http://appspeed.piaoquantv.com",
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "accept-language": "zh-CN,zh-Hans;q=0.9",
+        "Content-Type": "application/x-www-form-urlencoded",
+    }
+    payload = {
+        "coverImgPath": cover,
+        "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+        "fileExtensions": "MP4",
+        "loginUid": uid,
+        "networkType": "Wi-Fi",
+        "platform": "iOS",
+        "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+        "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "title": title,
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "uid": uid,
+        "versionCode": "486",
+        "versionName": "3.4.12",
+        "videoFromScene": "1",
+        "videoPath": oss_path,
+        "viewStatus": "1",
+    }
+    response = await async_post(url, headers, payload)
+    print(response)
+    return response
+
+
+async def getPQVideoDetail(video_id):
+    """
+    获取票圈视频详情信息
+    :return:
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {
+        "videoIdList": [video_id]
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = await async_post(url, header, json.dumps(data))
+    print(response)
+    return response
+
+
+class FinalResponse(object):
+    """
+    视频发布至pq并且获取video_id
+    """
+
+    def __init__(self, params, mysql_client):
+        """
+        :param params:
+        :param mysql_client:
+        :return:
+        """
+        self.kimi_title = None
+        self.video_obj_list = None
+        self.mini_type = None
+        self.gh_id = None
+        self.trace_id = None
+        self.params = params
+        self.mysql_client = mysql_client
+        self.mini_map = {
+            25: {
+                "avatar": "https://rescdn.yishihui.com/0temp/ttmhzfsh.png",
+                "id": "wx0b7d95eb293b783b",
+                "name": "天天美好祝福生活",
+                "index": 25,
+            },
+            29: {
+                "avatar": "https://rescdn.yishihui.com/0temp/cyfyld.png",
+                "id": "wx65c76bb4c67934db",
+                "name": "财运福运来到",
+                "index": 29,
+            },
+            31: {
+                "avatar": "https://rescdn.yishihui.com/0temp/mhzfshxf2.png",
+                "id": "wx2e4478b1641b3b15",
+                "name": "美好祝福生活幸福",
+                "index": 31,
+            },
+            36: {
+                "avatar": "https://rescdn.yishihui.com/0temp/zfyfyc.jpeg",
+                "id": "wxcddf231abd0dabdc",
+                "name": "祝福有福有财",
+                "index": 36,
+            },
+            27: {
+                "avatar": "https://rescdn.yishihui.com/0temp/xymhfqdd.png",
+                "id": "wx7187c217efef24a7",
+                "name": "幸运美好福气多多",
+                "index": 27,
+            },
+            33: {
+                "avatar": "https://rescdn.yishihui.com/0temp/ssnnfqd.jpeg",
+                "id": "wx5e67713277549b6f",
+                "name": "年年岁岁福气多",
+                "index": 33,
+            },
+        }
+
+    def checkParams(self):
+        """
+        校验参数
+        :return:
+        """
+        try:
+            self.trace_id = self.params['traceId']
+            self.kimi_title = self.params['kimiTitle']
+            self.gh_id = self.params['ghId']
+            self.mini_type = self.params['miniType']
+            self.video_obj_list = self.params['videoObjList']
+            return None
+        except Exception as e:
+            response = {
+                "traceId": self.trace_id,
+                "error": str(e),
+                "info": "参数校验失败"
+            }
+            return response
+
+    def createGzhPath(self, video_id, shared_uid, gh_id):
+        """
+        :param gh_id: 公众号账号的gh_id
+        :param video_id: 视频 id
+        :param shared_uid: 分享 id
+        """
+
+        def generate_source_id():
+            """
+            generate_source_id
+            :return:
+            """
+            timestamp = str(int(time.time() * 1000))
+            random_str = str(random.randint(1000, 9999))
+            hash_input = f"{timestamp}-{random_str}"
+            return hashlib.md5(hash_input.encode()).hexdigest()
+
+        root_share_id = str(uuid.uuid4())
+        if self.mini_type == 2:
+            source_id = (
+                    "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
+            )
+        elif self.mini_type == 1:
+            source_id = "longArticles_" + generate_source_id()
+        elif self.mini_type == 3:
+            source_id = "WeCom_" + generate_source_id()
+        else:
+            source_id = "Error mini_program_type {}".format(self.mini_type)
+        url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
+        # 自动把 root_share_id 加入到白名单
+        # auto_white(root_share_id)
+        return (
+            root_share_id,
+            source_id,
+            f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
+        )
+
+    def chooseMiniProgram(self, mini_id):
+        """
+        获取小程序分享卡片
+        :return:
+        """
+        mini_info = self.mini_map[mini_id]
+        return (
+            mini_info["avatar"],
+            mini_info["id"],
+            mini_info["name"],
+            mini_info["index"],
+        )
+
+    async def generateSingleCard(self, video_id, title, index, gh_id, mini_id):
+        """
+        生成单个分享卡片
+        :return:
+        """
+        response = await getPQVideoDetail(video_id)
+        print(response)
+        productionCover = response["data"][0]["shareImgPath"]
+        videoUrl = response["data"][0]["videoPath"]
+        user_id = response["data"][0]["user"]["uid"]
+        programAvatar, programId, programName, pqMiniId = self.chooseMiniProgram(mini_id)
+        root_share_id, source_id, productionPath = self.createGzhPath(video_id, user_id, gh_id)
+        logging(
+            code="1002",
+            info="root_share_id --{}, productionPath -- {}".format(
+                root_share_id, productionPath
+            ),
+            function="process",
+            trace_id=self.trace_id,
+        )
+        result = {
+            "productionCover": productionCover,
+            "productionName": title,
+            "programAvatar": programAvatar,
+            "programId": programId,
+            "programName": programName,
+            "source": "web",
+            "rootShareId": root_share_id,
+            "productionPath": productionPath,
+            "videoUrl": videoUrl,
+            "mini_id": mini_id,
+            "paragraphPosition": index * 0.25,
+        }
+        update_result_sql = f"""
+                            UPDATE {db_article}
+                            SET
+                                result{index} = %s,
+                                success = %s
+                            WHERE
+                                trace_id = %s;
+                        """
+        await self.mysql_client.async_insert(
+            sql=update_result_sql,
+            params=(json.dumps(result, ensure_ascii=False), 1, self.trace_id),
+        )
+        logging(
+            code="2000",
+            info="统计 root_share_id && video_id",
+            function="process",
+            trace_id=self.trace_id,
+            data={
+                "rootShareId": root_share_id,
+                "videoId": video_id,
+                "sourceId": source_id,
+            },
+        )
+        return result
+
+    async def generateCards(self, video_list, kimi_title, gh_id):
+        """
+        生成一组卡片
+        :param video_list:
+        :param kimi_title:
+        :param gh_id:
+        :return:
+        """
+        L = []
+        if self.mini_type == 1:
+            mini_id_list = [25, 29, 31]
+            video_count = len(video_list)
+            mini_choice_index = random.sample(range(3), video_count)
+            mini_choice_item = [mini_id_list[i] for i in mini_choice_index]
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generateSingleCard(
+                    video_id=video_id,
+                    title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=mini_choice_item[index - 1],
+                )
+                L.append(result)
+        elif self.mini_type == 2:
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generateSingleCard(
+                    video_id=video_id,
+                    title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=33,
+                )
+                L.append(result)
+        elif self.mini_type == 3:
+            for index, video_id in enumerate(video_list, 1):
+                result = await self.generateSingleCard(
+                    video_id=video_id,
+                    title=kimi_title,
+                    index=index,
+                    gh_id=gh_id,
+                    mini_id=27,
+                )
+                L.append(result)
+        return L
+
+    async def getVideoIdList(self):
+        """
+        获取匹配文章的id_list
+        :return:
+        """
+        vid_list = []
+        for video_obj in self.video_obj_list:
+            video_publish_info = await publishToPQ(video_obj=video_obj)
+            video_id = video_publish_info['data']['id']
+            vid_list.append(video_id)
+        return vid_list
+
+    async def deal(self):
+        """
+        deal function
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            video_list = await self.getVideoIdList()
+            card_list = await self.generateCards(
+                video_list, self.kimi_title, self.gh_id
+            )
+            response = {
+                "trace_id": self.trace_id,
+                "miniprogramList": card_list
+            }
+            return response
+
+
+

+ 68 - 0
applications/deal/preResponse.py

@@ -0,0 +1,68 @@
+"""
+@author: luojunhui
+返回预分配信息
+"""
+import json
+
+from static.config import db_article
+
+
+class PreResponse(object):
+    """
+    预匹配处理
+    """
+
+    def __init__(self, params, mysql_client):
+        """
+        输入trace_id, 返回trace_id所绑定的视频信息
+        :param params:
+        :param mysql_client:
+        """
+        self.trace_id = None
+        self.params = params
+        self.mysql_client = mysql_client
+
+    def checkParams(self):
+        """
+        校验参数
+        :return:
+        """
+        try:
+            self.trace_id = self.params['traceId']
+            return None
+        except Exception as e:
+            response = {
+                "error": "params check failed",
+                "info": str(e)
+            }
+            return response
+
+    async def findVideoInfo(self):
+        """
+
+        :return:
+        """
+        sql = f"""
+        select video_info1, video_info2, video_info3 from {db_article}
+        where trace_id = '{self.trace_id}';
+        """
+        video_info = await self.mysql_client.async_select(sql)
+        video_info_list = video_info[0]
+        vid_list = [json.loads(i) for i in video_info_list]
+        return vid_list
+
+    async def deal(self):
+        """
+        deal function
+        :return:
+        """
+        params_error = self.checkParams()
+        if params_error:
+            return params_error
+        else:
+            video_info_list = await self.findVideoInfo()
+            response = {
+                "traceId": self.trace_id,
+                "videoObjList": video_info_list
+            }
+            return response

+ 20 - 0
applications/functions/common.py

@@ -231,3 +231,23 @@ async def request_etl(url, headers, json_data, retries=6):
                     await asyncio.sleep(2)  # 等待一段时间后重试
                     await asyncio.sleep(2)  # 等待一段时间后重试
                 else:
                 else:
                     raise
                     raise
+
+
+async def async_post(url, headers, payload):
+    """
+    :param url:
+    :param headers:
+    :param payload:
+    :return:
+    """
+    retries = 3
+    async with aiohttp.ClientSession() as session:
+        for attempt in range(3):
+            try:
+                async with session.post(url, headers=headers, data=payload, timeout=10) as response:
+                    return await response.json()
+            except asyncio.TimeoutError:
+                if attempt < retries - 1:
+                    await asyncio.sleep(2)  # 等待一段时间后重试
+                else:
+                    raise

+ 22 - 1
applications/routes.py

@@ -3,7 +3,7 @@
 """
 """
 from quart import Blueprint, jsonify, request
 from quart import Blueprint, jsonify, request
 
 
-from applications.deal import Response, Record, Minigram
+from applications.deal import Response, Record, Minigram, PreResponse, FinalResponse
 
 
 my_blueprint = Blueprint('LongArticles', __name__)
 my_blueprint = Blueprint('LongArticles', __name__)
 
 
@@ -56,4 +56,25 @@ def Routes(mysql_client):
         response = await M.deal()
         response = await M.deal()
         return jsonify(response)
         return jsonify(response)
 
 
+    @my_blueprint.route("/pre_match", methods=['POST'])
+    async def pre_match_minigram():
+        """
+        :return:
+        """
+        data = await request.get_json()
+        M = PreResponse(params=data, mysql_client=mysql_client)
+        response = await M.deal()
+        return jsonify(response)
+
+    @my_blueprint.route("/final_match", methods=['POST'])
+    async def final_match_minigram():
+        """
+
+        :return:
+        """
+        data = await request.get_json()
+        FMM = FinalResponse(params=data, mysql_client=mysql_client)
+        response = await FMM.deal()
+        return jsonify(response)
+
     return my_blueprint
     return my_blueprint

+ 8 - 10
applications/schedule/search_schedule.py

@@ -238,8 +238,7 @@ async def video_sender(video_obj, user, trace_id, platform, content_id):
     """
     """
     异步处理微信 video_obj
     异步处理微信 video_obj
     公众号和站内账号一一对应
     公众号和站内账号一一对应
-    :param content_id: 文章id
-    :param index:
+    :param content_id:
     :param platform:
     :param platform:
     :param user:
     :param user:
     :param trace_id:
     :param trace_id:
@@ -278,18 +277,18 @@ async def video_sender(video_obj, user, trace_id, platform, content_id):
     header = {
     header = {
         "Content-Type": "application/json",
         "Content-Type": "application/json",
     }
     }
-    response = await request_etl(
-        url="http://192.168.203.137:4612/etl",
-        headers=header,
-        json_data=mq_obj
-    )
-    return response
     # response = await request_etl(
     # response = await request_etl(
-    #     url="http://localhost:4612/etl",
+    #     url="http://192.168.203.137:4612/etl",
     #     headers=header,
     #     headers=header,
     #     json_data=mq_obj
     #     json_data=mq_obj
     # )
     # )
     # return response
     # return response
+    response = await request_etl(
+        url="http://localhost:4612/etlV2",
+        headers=header,
+        json_data=mq_obj
+    )
+    return response
 
 
 
 
 async def search_videos(params, trace_id, gh_id, mysql_client):
 async def search_videos(params, trace_id, gh_id, mysql_client):
@@ -342,7 +341,6 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
                     platform=platform,
                     platform=platform,
                     content_id=params['content_id']
                     content_id=params['content_id']
                 )
                 )
-                print(response)
                 if response['status'] == "success":
                 if response['status'] == "success":
                     index += 1
                     index += 1
                     logging(
                     logging(

+ 1 - 1
match_server.toml

@@ -1,6 +1,6 @@
 reload = true
 reload = true
 bind = "0.0.0.0:8111"
 bind = "0.0.0.0:8111"
-workers = 10
+workers = 4
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 loglevel = "debug"  # 日志级别
 loglevel = "debug"  # 日志级别

+ 4 - 4
static/config.py

@@ -755,12 +755,12 @@ gh_id_dict = {
 
 
 
 
 # prod
 # prod
-db_article = "long_articles_video"
-db_video = "article_match_videos"
+# db_article = "long_articles_video"
+# db_video = "article_match_videos"
 
 
 # dev
 # dev
-# db_article = "long_articles_video_dev"
-# db_video = "article_match_videos_dev"
+db_article = "long_articles_video_dev"
+db_video = "article_match_videos_dev"
 
 
 # spider coroutines
 # spider coroutines
 spider_coroutines = 10
 spider_coroutines = 10

+ 247 - 0
tasks/new_task.py

@@ -0,0 +1,247 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+import json
+
+from static.config import db_article, db_video
+from applications.functions.log import logging
+from static.config import mysql_coroutines
+from applications.functions.common import async_post
+
+
+async def publishToPQ(video_obj):
+    """
+    publish video to pq
+    :return:
+    """
+    oss_path = video_obj['videoPath']
+    uid = video_obj['uid']
+    title = video_obj['title']
+    cover = video_obj['coverPath']
+    url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+    headers = {
+        "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+        "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+        "referer": "http://appspeed.piaoquantv.com",
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "accept-language": "zh-CN,zh-Hans;q=0.9",
+        "Content-Type": "application/x-www-form-urlencoded",
+    }
+    payload = {
+        "coverImgPath": cover,
+        "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+        "fileExtensions": "MP4",
+        "loginUid": uid,
+        "networkType": "Wi-Fi",
+        "platform": "iOS",
+        "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+        "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "title": title,
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "uid": uid,
+        "versionCode": "486",
+        "versionName": "3.4.12",
+        "videoFromScene": "1",
+        "videoPath": oss_path,
+        "viewStatus": "1",
+    }
+    response = await async_post(url, headers, payload)
+    return response
+
+
+async def getPQVideoDetail(video_id):
+    """
+    获取票圈视频详情信息
+    :return:
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {
+        "videoIdList": [video_id]
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = await async_post(url, header, json.dumps(data))
+    return response
+
+
+async def getNewVideoIds(video_obj_list):
+    """
+    video
+    :return:
+    """
+    vid_list = []
+    for video_obj in video_obj_list:
+        # video_obj 里面的信息对于历史数据可能不全,需要从pq获取
+        try:
+            if len(vid_list) >= 3:
+                return vid_list
+            else:
+                pq_response = await publishToPQ(video_obj)
+                video_id = pq_response['data']['id']
+                vid_list.append(video_id)
+        except:
+            continue
+    return vid_list
+
+
+class MatchTask3(object):
+    """
+    处理已经匹配过小程序的文章
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def getTaskList(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 3
+            ORDER BY request_time_stamp
+            ASC
+            LIMIT {mysql_coroutines};
+        """
+        tasks = await self.mysql_client.async_select(sql=select_sql1)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in tasks
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def getHistoryVideoOssPath(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_title, uid, video_path, cover_path
+            FROM {db_video}
+            where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        video_list = [
+            {
+                "title": line[0],
+                "uid": line[1],
+                "videoPath": line[2],
+                "coverPath": line[3]
+            }
+            for line in content_videos
+        ]
+        if len(video_list) >= 3:
+            return video_list
+        else:
+            return None
+
+    async def useExistOssPath(self, video_info_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        video_id_list = await getNewVideoIds(video_info_list)
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                vid2,
+                vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def processTask(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
+        if oss_path_list:
+            # 说明已经存在了结果, 将该条记录下的video_oss拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            await self.useExistOssPath(video_info_list=oss_path_list, params=params)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.getTaskList()
+        task_dict = {}
+        for task in task_list:
+            key = task['content_id']
+            task_dict[key] = task
+        process_list = []
+        for item in task_dict:
+            process_list.append(task_dict[item])
+        if process_list:
+            tasks = [self.processTask(params) for params in process_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )
+

+ 46 - 0
test_code/final_match.py

@@ -0,0 +1,46 @@
+"""
+@author: luojunhui
+"""
+import time
+
+import requests
+
+url = "http://localhost:8111/final_match"
+
+header = {
+    "Content-Type": "application/json",
+}
+
+
+body = {
+    "kimiTitle": "🔥96岁北大教授夫人选择断食离世,生前遗言震撼人心!😢",
+    "traceId": "search-d2f0ec07-8d91-4440-a438-1d9e9bbb9941-1723033241",
+    "videoObjList": [
+        {
+            "coverPath": "long_articles/image/919ea8b6-399c-4117-90b1-9cbfa04655c4",
+            "title": "漫话寿高多辱(下)陈司寇老师对待生死的达观态度值得老年人借鉴",
+            "uid": "69637509",
+            "videoPath": "long_articles/video/c6ed59d1-54b2-4e7f-9a09-0f2f40b28afa"
+        },
+        {
+            "coverPath": "long_articles/image/c067097b-cc1e-4773-9d5c-069503d96b89",
+            "title": "96岁北大教授夫人绝食4天后离世,临终遗言引人深思",
+            "uid": "69637509",
+            "videoPath": "long_articles/video/e88d8334-b838-4b0e-ac25-258ca6f1d789"
+        },
+        {
+            "coverPath": "long_articles/image/b8b042e5-fac0-4a43-b869-8ec7f51d38b6",
+            "title": "96岁北大教授夫人绝食4天后离世,临终遗言引人深思",
+            "uid": "69637509",
+            "videoPath": "long_articles/video/e2fd64c2-10d9-48f4-aed1-3cf26c7bc050"
+        }
+    ],
+    "ghId": "gh_6d205db62f04",
+    "miniType": 1
+}
+b = time.time()
+response = requests.post(url=url, headers=header, json=body)
+a = time.time()
+
+print(a - b)
+print(response.json())

+ 19 - 0
test_code/generate_id.py

@@ -0,0 +1,19 @@
+"""
+@author: luojunhui
+pages/category?jumpPage=pages%2Fuser-videos%3Fid%3D21932340%26su%3D70731104%26fromGzh%3D1%26rootShareId%3D703f489c-b942-48f6-898e-6f77642b48e5%26shareId%3D703f489c-b942-48f6-898e-6f77642b48e5%26rootSourceId%3DlongArticles_2930ef1ac0c0f52dcf45ab823abf3ac7
+"""
+import os
+import json
+
+path_dir = os.listdir("vv")
+
+fw = open("id.txt", "a+", encoding="utf-8")
+for file in path_dir:
+    print(file)
+    ori_id = file.replace(".json", "")
+    new_p = os.path.join("vv", file)
+    with open(new_p, encoding="utf-8") as f:
+        data = json.loads(f.read())
+    new_id = data['data']['id']
+    fw.write("{},{}\n".format(ori_id, new_id))
+

+ 114 - 0
test_code/publish_by_async.py

@@ -0,0 +1,114 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+import json
+import os.path
+
+import aiohttp
+import aiofiles
+
+
+async def write_json_file(old_video_id, new_obj):
+    """
+    异步写文件
+    :param old_video_id:
+    :param new_obj:
+    """
+    filename = "vv/{}.json".format(old_video_id)
+    async with aiofiles.open(filename, mode='w', encoding='utf-8') as f:
+        await f.write(json.dumps(new_obj, ensure_ascii=False, indent=4))
+
+
+async def fetch_data(url, headers, payload):
+    """
+    :param url:
+    :param headers:
+    :param payload:
+    :return:
+    """
+    async with aiohttp.ClientSession() as session:
+        async with session.post(url, headers=headers, data=payload) as response:
+            return await response.json()
+
+
+async def publish(obj):
+    """
+
+    :param obj:
+    :return:
+    """
+    cover = obj['cover']
+    uid = obj['uid']
+    title = obj['title']
+    video_path = obj['oss_path']
+    old_video_id = obj['vid']
+    path = "vv/{}.json".format(old_video_id)
+    if os.path.exists(path):
+        pass
+    else:
+        try:
+            url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+            headers = {
+                "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+                "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+                "referer": "http://appspeed.piaoquantv.com",
+                "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+                "accept-language": "zh-CN,zh-Hans;q=0.9",
+                "Content-Type": "application/x-www-form-urlencoded",
+            }
+            payload = {
+                "coverImgPath": cover,
+                "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+                "fileExtensions": "MP4",
+                "loginUid": uid,
+                "networkType": "Wi-Fi",
+                "platform": "iOS",
+                "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+                "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+                "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+                "title": title,
+                "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+                "uid": uid,
+                "versionCode": "486",
+                "versionName": "3.4.12",
+                "videoFromScene": "1",
+                "videoPath": video_path,
+                "viewStatus": "1",
+            }
+            new_obj = await fetch_data(url, headers, payload)
+            await write_json_file(old_video_id, new_obj)
+        except:
+            pass
+        # return new_obj['data']['id']
+
+
+# 批量处理函数
+async def process_in_batches(task_list, batch_size):
+    """
+
+    :param task_list:
+    :param batch_size:
+    :return:
+    """
+    for i in range(0, len(task_list), batch_size):
+        batch = task_list[i:i + batch_size]  # 拆分批次
+        tasks = [publish(params) for params in batch]
+        results = await asyncio.gather(*tasks)
+        print("批次{}结果:".format(i), results)
+
+
+async def main():
+    with open("id_dict.json", encoding="utf-8") as f:
+        data = json.loads(f.read())
+    L = []
+    for key in data:
+        detail = data[key]
+        detail['vid'] = key
+        L.append(detail)
+    task_list = L
+    tasks = [publish(params) for params in task_list]
+    await process_in_batches(task_list, batch_size=30)
+    await asyncio.gather(*tasks)
+
+asyncio.run(main())

+ 4 - 345
tt.py

@@ -6,349 +6,7 @@ import time
 import aiomysql
 import aiomysql
 import asyncio
 import asyncio
 
 
-from static.config import db_article, db_video
-from applications.schedule import search_videos
-from applications.functions.log import logging
-from static.config import spider_coroutines
-
-
-class MatchTask1(object):
-    """
-    定时执行任务
-    """
-
-    def __init__(self, mysql_client):
-        """
-        :param mysql_client:
-        """
-        self.mysql_client = mysql_client
-
-    async def get_task(self):
-        """
-        获取任务
-        :return:
-        """
-        select_sql1 = f"""
-            SELECT DISTINCT (content_id)       
-            FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
-            ORDER BY request_time_stamp
-            ASC
-            LIMIT {spider_coroutines};
-        """
-        content_ids = await self.mysql_client.async_select(select_sql1)
-        cil = []
-        for content_id in content_ids:
-            cil.append(content_id[0])
-        content_ids_tuple = str(cil).replace("[", "(").replace("]", ")")
-        if len(content_ids_tuple) > 3:
-            select_sql = f"""
-                SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
-                FROM {db_article} 
-                WHERE content_id in {content_ids_tuple} and process_times <= 5 
-                ORDER BY request_time_stamp
-                ASC;
-            """
-            print(select_sql)
-            task_list = await self.mysql_client.async_select(sql=select_sql)
-            task_obj_list = [
-                {
-                    "trace_id": item[0],
-                    "content_id": item[1],
-                    "gh_id": item[2],
-                    "title": item[3],
-                    "text": item[4],
-                    "content_status": item[5],
-                    "process_times": item[6]
-                } for item in task_list
-            ]
-            logging(
-                code="9001",
-                info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
-                data=task_obj_list
-            )
-            return task_obj_list
-        else:
-            return []
-
-    async def get_history_videos(self, content_id):
-        """
-        check whether the contents videos exists
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-            SELECT video_id
-            FROM {db_video}
-            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
-        """
-        content_videos = await self.mysql_client.async_select(select_sql)
-        videos = [vid for vid in content_videos]
-        if len(videos) >= 3:
-            return videos
-        else:
-            return None
-
-    async def judge_content_processing(self, content_id):
-        """
-        判断该content_id是否在处理中
-        :param content_id:
-        :return:
-        """
-        select_sql = f"""
-                       SELECT trace_id, content_status
-                       FROM {db_article}
-                       WHERE content_id = '{content_id}'
-                       ORDER BY id DESC;
-                   """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status == 1:
-                    return False
-            return True
-        else:
-            return True
-
-    async def use_exists_contents_videos(self, video_id_list, params):
-        """
-        使用已经存在的视频id
-        :return:
-        """
-        trace_id = params['trace_id']
-        content_id = params['content_id']
-        select_sql = f"""
-            SELECT kimi_title
-            FROM {db_article}
-            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
-        """
-        info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title = info[0]
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                kimi_title=%s,
-                recall_video_id1=%s, 
-                recall_video_id2=%s, 
-                recall_video_id3=%s,
-                content_status=%s,
-                process_times = %s
-            WHERE  trace_id = %s
-        """
-        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                kimi_title,
-                video_id_list[0],
-                "NULL" if vid2 is None else vid2,
-                "NULL" if vid3 is None else vid3,
-                2,
-                int(params['process_times']) + 1,
-                trace_id
-            )
-        )
-        logging(
-            code="9002",
-            info="已从历史文章更新,文章id: {}".format(content_id),
-            trace_id=trace_id
-        )
-
-    async def start_process(self, params):
-        """
-        开始处理
-        :param params:
-        :return:
-        """
-        # 更新文章contentId为1, 说明该文章正在处理中
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                content_status = %s
-            WHERE 
-                trace_id = %s;
-        """
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                1, params['trace_id']
-            )
-        )
-        try:
-            video_count = await search_videos(
-                params={
-                    "title": params['title'],
-                    "content": params['text'],
-                    "trace_id": params['trace_id'],
-                    "content_id": params['content_id']
-                },
-                trace_id=params['trace_id'],
-                gh_id=params['gh_id'],
-                mysql_client=self.mysql_client
-            )
-            select_sql = f"""
-                SELECT video_id
-                FROM {db_video}
-                WHERE content_id = '{params['content_id']}'
-            """
-            result = await self.mysql_client.async_select(sql=select_sql)
-            vid1, vid2, vid3 = result[0], result[1], result[2]
-            if vid1 or vid2 or vid3:
-                update_sql2 = f"""
-                    UPDATE {db_article}
-                    SET
-                        recall_video_id1 = %s,
-                        recall_video_id2 = %s,
-                        recall_video_id3 = %s,
-                        content_status = %s,
-                        process_times = %s
-                        WHERE trace_id = %s;
-                """
-                await self.mysql_client.async_insert(
-                    sql=update_sql2,
-                    params=(
-                        vid1 if vid1 else "NULL",
-                        vid2 if vid2 else "NULL",
-                        vid3 if vid3 else "NULL",
-                        2,
-                        {int(params['process_times']) + 1},
-                        params['trace_id']
-                    )
-                )
-                logging(
-                    code="9008",
-                    info="视频搜索成功, 状态修改为2",
-                    trace_id=params['trace_id']
-                )
-            else:
-                if int(params['process_times']) < 5:
-                    update_sql3 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                        WHERE trace_id = %s;
-                                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql3,
-                        params=(0, int(params['process_times']) + 1, params['trace_id'])
-                    )
-                    logging(
-                        code="9018",
-                        info="视频搜索失败,回退状态为0",
-                        trace_id=params['trace_id']
-                    )
-                else:
-                    update_sql3 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                        WHERE trace_id = %s;
-                                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql3,
-                        params=(3, int(params['process_times']) + 1, params['trace_id'])
-                    )
-                    logging(
-                        code="9019",
-                        info="视频多次搜索失败,状态修改为3",
-                        trace_id=params['trace_id']
-                    )
-        except Exception as e:
-            if int(params['process_times']) < 5:
-                logging(
-                    code="9018",
-                    info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
-                    trace_id=params['trace_id']
-                )
-                update_sql4 = f"""
-                    UPDATE {db_article}
-                    SET
-                       content_status = %s,
-                       process_times = %s
-                    WHERE trace_id = %s;
-                """
-                await self.mysql_client.async_insert(
-                    sql=update_sql4,
-                    params=(0, int(params['process_times']) + 1, params['trace_id'])
-                )
-            else:
-                logging(
-                    code="9019",
-                    info="{}异常错误:{}, 状态修改为3".format(params['trace_id'], e),
-                    trace_id=params['trace_id']
-                )
-                update_sql4 = f"""
-                                    UPDATE {db_article}
-                                    SET
-                                       content_status = %s,
-                                       process_times = %s
-                                    WHERE trace_id = %s;
-                                """
-                await self.mysql_client.async_insert(
-                    sql=update_sql4,
-                    params=(3, int(params['process_times']) + 1, params['trace_id'])
-                )
-
-    async def process_task(self, params):
-        """
-        异步执行
-        :param params:
-        :return:
-        """
-        content_id = params['content_id']
-        trace_id = params['trace_id']
-        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
-        video_id_list = await self.get_history_videos(content_id=content_id)
-        if video_id_list:
-            # 说明已经存在了结果, 将该条记录下的video_id拿出来
-            logging(
-                code="9001",
-                info="存在历史文章",
-                trace_id=trace_id
-            )
-            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
-        else:
-            flag = await self.judge_content_processing(content_id)
-            if flag:
-                logging(
-                    code="9004",
-                    info="无正在处理的文章ID, 开始处理",
-                    trace_id=trace_id
-                )
-                await self.start_process(params=params)
-            else:
-                logging(
-                    code="9003",
-                    info="该文章ID正在请求--文章ID {}".format(content_id),
-                    trace_id=trace_id
-                )
-
-    async def deal(self):
-        """
-        处理
-        :return:
-        """
-        task_list = await self.get_task()
-        task_dict = {}
-        for task in task_list:
-            key = task['content_id']
-            task_dict[key] = task
-        process_list = []
-        for item in task_dict:
-            process_list.append(task_dict[item])
-        if process_list:
-            # for task in task_list:
-            #     await self.process_task(task)
-            tasks = [self.process_task(params) for params in process_list]
-            await asyncio.gather(*tasks)
-        else:
-            logging(
-                code="9008",
-                info="没有要处理的请求"
-            )
+from tasks.new_task import MatchTask3
 
 
 
 
 class TaskMySQLClient(object):
 class TaskMySQLClient(object):
@@ -415,10 +73,11 @@ async def main():
     """
     """
     TMC = TaskMySQLClient()
     TMC = TaskMySQLClient()
     await TMC.init_pool()
     await TMC.init_pool()
-    PD = MatchTask1(TMC)
+    PD = MatchTask3(TMC)
     await PD.deal()
     await PD.deal()
 
 
+
 if __name__ == '__main__':
 if __name__ == '__main__':
     while True:
     while True:
         asyncio.run(main())
         asyncio.run(main())
-        time.sleep(10)
+        time.sleep(10)