Bläddra i källkod

上线 ETL
搜索启动兜底
弃用票圈内视频

罗俊辉 11 månader sedan
förälder
incheckning
a9924b63bb

+ 220 - 0
applications/functions/async_etl.py

@@ -0,0 +1,220 @@
+"""
+@author: luojunhui
+"""
+
+import os
+import time
+
+import oss2
+import json
+import aiohttp
+import aiofiles
+from hashlib import md5
+from uuid import uuid4
+
+import requests
+from fake_useragent import FakeUserAgent
+
+
+async def upload_to_oss(local_video_path):
+    """
+    把视频上传到 oss
+    :return:
+    """
+    oss_video_key = str(uuid4())
+    access_key_id = "LTAIP6x1l3DXfSxm"
+    access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    endpoint = "oss-cn-hangzhou.aliyuncs.com"
+    bucket_name = "art-pubbucket"
+    bucket = oss2.Bucket(
+        oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
+    )
+    bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
+    return oss_video_key
+
+
+class AsyncETL(object):
+    """
+    视频下载功能
+    """
+
+    def __init__(self, video_obj):
+        self.platform = video_obj["platform"]
+        self.video_id = video_obj["video_id"]
+        self.video_url = video_obj["video_url"]
+        self.uid = video_obj["user_id"]
+        self.title = video_obj["video_title"]
+        self.cover_url = video_obj["cover_url"]
+        self.proxy = {
+            "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
+            "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
+        }
+        self.max_retry = 5
+
+    def request_header(self):
+        """
+        请求头
+        :return:
+        """
+        if self.platform == "xg_search":
+            if "v9-xg-web-pc.ixigua.com" in self.video_url:
+                headers = {
+                    "Accept": "*/*",
+                    "Accept-Language": "zh-CN,zh;q=0.9",
+                    "Host": "v9-xg-web-pc.ixigua.com",
+                    "User-Agent": FakeUserAgent().chrome,
+                    "Origin": "https://www.ixigua.com/",
+                    "Referer": "https://www.ixigua.com/"
+                }
+            elif "v3-xg-web-pc.ixigua.com" in self.video_url:
+                headers = {
+                    "Accept": "*/*",
+                    "Accept-Language": "zh-CN,zh;q=0.9",
+                    "Host": "v3-xg-web-pc.ixigua.com",
+                    "User-Agent": FakeUserAgent().chrome,
+                    "Origin": "https://www.ixigua.com/",
+                    "Referer": "https://www.ixigua.com/"
+                }
+            else:
+                headers = {
+                    "Accept": "*/*",
+                    "Accept-Language": "zh-CN,zh;q=0.9",
+                    "Host": "v3-xg-web-pc.ixigua.com",
+                    "User-Agent": FakeUserAgent().chrome,
+                    "Origin": "https://www.ixigua.com/",
+                    "Referer": "https://www.ixigua.com/"
+                }
+        elif self.platform == "baidu_search":
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "User-Agent": FakeUserAgent().chrome,
+            }
+        elif self.platform == "wx_search":
+            headers = {
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9",
+                "User-Agent": FakeUserAgent().chrome,
+                "Origin": "https://mp.weixin.qq.com",
+                "Referer": "https://mp.weixin.qq.com"
+            }
+        else:
+            headers = {}
+        return headers
+
+    def generate_video_path(self):
+        """
+        通过视频信息生成唯一视频地址
+        :return:
+        """
+        index = "{}-{}".format(self.platform, self.video_id)
+        index = md5(index.encode()).hexdigest()
+        file_name = "{}.mp4".format(index)
+        cover_name = "{}.png".format(index)
+        file_path = os.path.join(os.getcwd(), "videos", file_name)
+        cover_path = os.path.join(os.getcwd(), "videos", cover_name)
+        return file_path, cover_path
+
+    async def publish_by__request(self, video_path, cover):
+        """
+        发布
+        :return:
+        """
+        url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+        headers = {
+            "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+            "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+            "referer": "http://appspeed.piaoquantv.com",
+            "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+            "accept-language": "zh-CN,zh-Hans;q=0.9",
+            "Content-Type": "application/x-www-form-urlencoded",
+        }
+        payload = {
+            "coverImgPath": cover,
+            "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+            "fileExtensions": "MP4",
+            "loginUid": self.uid,
+            "networkType": "Wi-Fi",
+            "platform": "iOS",
+            "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+            "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+            "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+            "title": self.title,
+            "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+            "uid": self.uid,
+            "versionCode": "486",
+            "versionName": "3.4.12",
+            "videoFromScene": "1",
+            "videoPath": video_path,
+            "viewStatus": "1",
+        }
+        response = requests.post(
+            url=url,
+            headers=headers,
+            data=payload,
+        )
+        return response.json()
+
+    async def download(self, file_path):
+        """
+        :param file_path:
+        :return:
+        """
+        headers = self.request_header()
+        if os.path.exists(file_path):
+            file_size = os.path.getsize(file_path)
+            headers["Range"] = f"bytes={file_size}-"
+        else:
+            file_size = 0
+        async with aiohttp.ClientSession() as session:
+            async with session.get(self.video_url, headers=headers) as response:
+                if response.status in [200, 206]:
+                    mode = "ab+" if file_size > 0 else "wb"
+                    f = await aiofiles.open(file_path, mode)
+                    await f.write(await response.read())
+                    await f.close()
+                else:
+                    print(response.status)
+        return file_path
+
+    async def download_cover(self, file_path):
+        """
+        下载视频封面
+        :param file_path:
+        :return:
+        """
+        headers = self.request_header()
+        response = requests.get(url=self.cover_url, headers=headers)
+        with open(file_path, "wb") as f:
+            f.write(response.content)
+        return file_path
+
+    async def etl_deal(self):
+        """
+        ETL Deal Task
+        :return:
+        """
+        local_video_path, local_cover_path = self.generate_video_path()
+        # download videos
+        file_path = await self.download(local_video_path)
+        # download cover
+        cover_path = await self.download_cover(local_cover_path)
+        # upload to oss
+        oss_video = await upload_to_oss(
+            local_video_path=file_path,
+        )
+        oss_cover = await upload_to_oss(
+            local_video_path=cover_path
+        )
+        # publish to pq
+        result = await self.publish_by__request(
+            video_path=oss_video,
+            cover=oss_cover
+        )
+        print(json.dumps(result, ensure_ascii=False, indent=4))
+        a = time.time()
+        os.remove(file_path)
+        os.remove(cover_path)
+        b = time.time()
+        print(b - a)
+        return result["data"]["id"]

+ 7 - 3
applications/functions/video_item.py

@@ -5,6 +5,7 @@ import time
 from applications.functions.mq import MQ
 from applications.functions.log import logging
 from applications.functions.common import Functions
+from applications.functions.async_etl import AsyncETL
 
 
 class VideoItem(object):
@@ -212,7 +213,7 @@ async def video_mq_sender(video_obj, user, trace_id, platform):
     :param video_obj:
     :return:
     """
-    ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
+    # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
     Video = VideoProducer()
     if platform == "xg_search":
         mq_obj = Video.xg_video_producer(
@@ -234,10 +235,13 @@ async def video_mq_sender(video_obj, user, trace_id, platform):
         )
     else:
         mq_obj = {}
-    ETL_MQ.send_msg(params=mq_obj)
+    AE = AsyncETL(video_obj=mq_obj)
+    video_id = await AE.etl_deal()
+    # ETL_MQ.send_msg(params=mq_obj)
     logging(
         code="6002",
-        info="发送消息至 ETL",
+        info="视频下载完成",
         data=mq_obj,
         trace_id=trace_id
     )
+    return video_id

+ 3 - 2
applications/match_alg/recall.py

@@ -3,7 +3,7 @@
 """
 
 from applications.functions.log import logging
-from applications.functions.common import MySQLServer
+# from applications.functions.common import MySQLServer
 
 
 async def recall_videos(trace_id, s_videos):
@@ -20,7 +20,8 @@ async def recall_videos(trace_id, s_videos):
 
     # 在两边召回视频
     # pq_videos
-    recall_video_list = MySQLServer().select_pq_videos()
+    # recall_video_list = MySQLServer().select_pq_videos()
+    recall_video_list = []
     # dirs_1 = os.path.join(os.getcwd(), 'applications', 'static', 'out_videos')
     # file_list = [os.path.join(dirs_1, "{}.json".format(vid)) for vid in s_videos]
     # search_list = []

+ 3 - 2
applications/routes.py

@@ -49,7 +49,7 @@ async def search_videos_from_the_web():
     )
     # try:
     kimi_info = await K.search_kimi_schedule(params=params)
-    await search_videos(
+    video_id = await search_videos(
         kimi_info=kimi_info,
         trace_id=trace_id,
         gh_id=gh_id
@@ -58,7 +58,8 @@ async def search_videos_from_the_web():
     res = {
         "trace_id": trace_id,
         "code": 0,
-        "kimi_title": kimi_info['k_title']
+        "kimi_title": kimi_info['k_title'],
+        "search_video_id": video_id
     }
     # except Exception as e:
     #     res = {

+ 8 - 10
applications/schedule/main_schedule.py

@@ -6,7 +6,6 @@
 @author: luojunhui
 """
 import time
-import asyncio
 import requests
 
 
@@ -73,21 +72,20 @@ class AskForInfo:
         a = time.time()
         res = await self.search_request()
         b = time.time()
-        print("search_time")
-        print(b - a)
+        print("search_time:\t", b - a)
         if res['code'] == 0:
             trace_id = res["trace_id"]
             kimi_title = res['kimi_title']
-            c = time.time()
-            res_obj = await self.check_out_videos(trace_id=trace_id)
-            d = time.time()
-            print("recall time")
-            print(d - c)
+            video_id = res['search_video_id']
+            res_obj = {
+                "search_videos": "success" if video_id else "fail",
+                "trace_id": trace_id,
+                "video_list": [video_id] if video_id else []
+            }
             time.sleep(2)
             final_obj = await self.ask_for_info(res_obj=res_obj, kt=kimi_title)
             e = time.time()
-            print("Rank and Return Time")
-            print(e - d - 2)
+            print("Rank and Return Time:\t", e - b - 2)
             return final_obj
         elif res['code'] == 1:
             return {

+ 4 - 3
applications/schedule/process_schedule.py

@@ -65,14 +65,15 @@ class ProcessParams(object):
         if best_video_id:
             print("best video id", best_video_id)
             response = Functions().request_for_info(best_video_id)
+            # print(json.dumps(response, ensure_ascii=False, indent=4))
             productionCover = response['data'][0]['shareImgPath']
             # productionName = response["data"][0]['title']
             productionName = kimi_title
             videoUrl = response['data'][0]['videoPath']
             user_id = response['data'][0]['user']['uid']
-            programAvatar = "/static/logo.png"
-            programId = "wx0b7d95eb293b783b"
-            programName = "天天美好祝福好生活"
+            programAvatar = "https://rescdn.yishihui.com/0temp/lehuo.png"
+            programId = "wxe8f8f0e23cecad0f"
+            programName = "票圈乐活"
             source = "Web"
             root_share_id, productionPath = Functions().create_gzh_path(video_id=best_video_id, shared_uid=user_id)
             logging(

+ 35 - 7
applications/schedule/search_schedule.py

@@ -37,6 +37,32 @@ class SearchABTest(object):
         cls.trace_id = info["trace_id"]
         cls.gh_id = gh_id
 
+    @classmethod
+    def dd(cls):
+        """
+        兜底
+        :return:
+        """
+        wx_result_ = wx_search(keys=cls.article_keys[0])
+        if wx_result_:
+            logging(
+                code="7011",
+                info="微信兜底搜索成功",
+                trace_id=cls.trace_id,
+            )
+            return {"platform": "wx_search", "result": wx_result_[0]}
+        else:
+            baidu_result_ = hksp_search(key=cls.article_keys[0])
+            if baidu_result_:
+                logging(
+                    code="7011",
+                    info="百度兜底搜索成功",
+                    trace_id=cls.trace_id,
+                )
+                return {"platform": "baidu_search", "result": baidu_result_[0]}
+            else:
+                return None
+
     @classmethod
     def ab_0(cls):
         """
@@ -69,10 +95,10 @@ class SearchABTest(object):
                 else:
                     logging(
                         code="7001",
-                        info="通过西瓜搜索失败---{}".format(cls.ori_title),
+                        info="通过西瓜搜索失败---{}, 启用兜底方式".format(cls.ori_title),
                         trace_id=cls.trace_id,
                     )
-                return None
+                    return cls.dd()
 
     @classmethod
     def ab_1(cls):
@@ -105,10 +131,10 @@ class SearchABTest(object):
                 else:
                     logging(
                         code="7001",
-                        info="通过西瓜搜索失败---{}".format(cls.article_summary),
+                        info="通过西瓜搜索失败---{},启用兜底方式".format(cls.article_summary),
                         trace_id=cls.trace_id,
                     )
-                return None
+                    return cls.dd()
 
     @classmethod
     def ab_2(cls):
@@ -142,10 +168,10 @@ class SearchABTest(object):
                 else:
                     logging(
                         code="7001",
-                        info="通过西瓜搜索失败---{}".format(",".join(cls.article_keys)),
+                        info="通过西瓜搜索失败---{},启用兜底".format(",".join(cls.article_keys)),
                         trace_id=cls.trace_id,
                     )
-                return None
+                    return cls.dd()
 
     @classmethod
     def ab_3(cls):
@@ -224,11 +250,13 @@ async def search_videos(kimi_info, trace_id, gh_id):
                 trace_id=trace_id,
                 data=recall_video,
             )
-            await video_mq_sender(
+            video_id = await video_mq_sender(
                 video_obj=recall_video,
                 user=gh_id_dict.get(gh_id),
                 trace_id=trace_id,
                 platform=platform,
             )
+            return video_id
     else:
         logging(code="7003", info="视频搜索失败", trace_id=trace_id)
+        return None

+ 1 - 1
applications/search/xigua_search.py

@@ -232,4 +232,4 @@ def xigua_search(keyword):
                 print(e)
         return []
     else:
-        return []
+        return []

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 1 - 1
dev/test.py


+ 2 - 1
requirements.txt

@@ -103,4 +103,5 @@ WTForms==3.1.2
 yarl==1.9.4
 zipp==3.16.2
 
-lxml~=5.2.1
+lxml~=5.2.1
+aiomysql~=0.2.0

Vissa filer visades inte eftersom för många filer har ändrats