罗俊辉 hai 11 meses
pai
achega
86440dbe13

+ 4 - 0
applications/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .async_mysql import AsyncMySQLClient

+ 59 - 0
applications/async_mysql.py

@@ -0,0 +1,59 @@
+"""
+@author: luojunhui
+"""
+import aiomysql
+
+
+class AsyncMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self, app):
+        self.app = app
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.app.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4'
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.app.mysql_pool.close()
+        await self.app.mysql_pool.wait_closed()
+
+    async def select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.app.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql):
+        """
+        insert and update method
+        :param sql:
+        :return:
+        """
+        async with self.app.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql)
+                await coon.commit()

+ 21 - 0
applications/functions.py

@@ -0,0 +1,21 @@
+"""
+@author: luojunhui
+"""
+from datetime import datetime, timedelta
+
+
+def generate_daily_strings(start_date, end_date):
+    """
+    Generate daily date_str
+    :param start_date:
+    :param end_date:
+    :return:
+    """
+    start = datetime.strptime(start_date, "%Y%m%d")
+    end = datetime.strptime(end_date, "%Y%m%d")
+    current = start
+    date_strings = []
+    while current <= end:
+        date_strings.append(current.strftime("%Y%m%d"))
+        current += timedelta(days=1)
+    return date_strings

+ 63 - 0
applications/mysql.py

@@ -0,0 +1,63 @@
+"""
+@author: luojunhui
+"""
+import pymysql
+
+
+class MySQL(object):
+    """
+    MySQL 方法
+    """
+
+    @classmethod
+    def get_pq_top_return_videos(cls, video_id):
+        """
+        获取票圈视频
+        :return:
+        """
+        connection = pymysql.connect(
+            host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            port=3306,  # 端口号
+            user="wx2016_longvideo",  # mysql用户名
+            passwd="wx2016_longvideoP@assword1234",  # mysql用户登录密码
+            db="incentive",  # 数据库名
+            charset="utf8mb4"  # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+        )
+        sql = f"""select * from video_content where video_id = {video_id};"""
+        cursor = connection.cursor()
+        cursor.execute(sql)
+        data = cursor.fetchall()
+        return data[0]
+
+    @classmethod
+    def migrate_data_to_mysql(cls, obj):
+        """
+        把 data_works 数据迁移到数据库
+        :param obj:
+        :return:
+        """
+        video_id = obj['videoid']
+        title = obj['title']
+        return_ = obj['当日回流']
+        view_ = obj['曝光量']
+        video_url = obj['视频原地址']
+        dt = obj['dt']
+        rov = int(return_) / int(view_) if int(view_) > 0 else 0
+        insert_sql = f"""
+            INSERT INTO top_return_daily
+                (video_id, title, view_, return_, video_url, dt, rov)
+            VALUES 
+                ({video_id}, '{title}', {view_}, {return_}, '{video_url}', '{dt}', {rov});
+        """
+        # print(insert_sql)
+        connection = pymysql.connect(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4'
+        )
+        cursor = connection.cursor()
+        cursor.execute(insert_sql)
+        connection.commit()

+ 34 - 0
applications/odps_server.py

@@ -0,0 +1,34 @@
+"""
+@author: luojunhui
+"""
+from odps import ODPS
+
+
+class PyODPS(object):
+    """
+    PyODPS class, get data from odps server
+    """
+
+    def __init__(self):
+        self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
+        self.access_id = "LTAIWYUujJAm7CbH"
+        self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        self.project = "loghubods"
+
+        self.od = ODPS(
+            access_id=self.access_id,
+            secret_access_key=self.access_key,
+            endpoint=self.endpoint,
+            project=self.project,
+        )
+
+    def select(self, sql):
+        """
+        :param sql: 查询语句
+        :return: odps_obj{}
+        """
+        result = []
+        with self.od.execute_sql(sql).open_reader() as reader:
+            for record in reader:
+                result.append(record)
+        return result

+ 6 - 0
config.toml

@@ -0,0 +1,6 @@
+reload = true
+bind = "0.0.0.0:8888"
+workers = 4
+keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
+graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
+loglevel = "debug"  # 日志级别

+ 4 - 0
deal/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .request_deal import RequestDeal

+ 98 - 0
deal/request_deal.py

@@ -0,0 +1,98 @@
+"""
+@author: luojunhui
+"""
+
+
+class RequestDeal(object):
+    """
+    Deal Request from outside
+    """
+
+    def __init__(self, params, mysql_client):
+        self.start_dt = params['start_date']
+        self.end_dt = params['end_date']
+        self.cate = params['cate']
+        self.topN = params['topN'] if params.get("topN") else 1
+        self.mysql_client = mysql_client
+
+    def response_obj(self, data_list):
+        """
+        处理 return 信息
+        :param data_list:
+        :return:
+        """
+        result_list = [
+            {
+                "video_id": obj[0],
+                "title": obj[1],
+                "video_url": obj[2]
+            } for obj in data_list
+        ]
+        response = {
+            "status": "success",
+            "cate": self.cate,
+            "start_date": self.start_dt,
+            "end_dt": self.end_dt,
+            "topN": self.topN,
+            "data": result_list
+        }
+        return response
+
+    async def deal(self):
+        """
+        deal function
+        :return:
+        """
+        if self.cate == "video_return":
+            return await self.deal_return()
+        elif self.cate == "video_view":
+            return await self.deal_view()
+        elif self.cate == "video_rov":
+            return await self.deal_rov()
+        else:
+            return {"error": "params is not correct"}
+
+    async def deal_return(self):
+        """
+        return
+        :return:
+        """
+        sql = f"""
+        select video_id, title, video_url 
+        from top_return_daily 
+        where dt >= '{self.start_dt}' and dt < '{self.end_dt}' 
+        order by return_ DESC
+        limit {self.topN}
+        """
+        result_list = await self.mysql_client.select(sql)
+        return self.response_obj(result_list)
+
+    async def deal_view(self):
+        """
+        view
+        :return:
+        """
+        sql = f"""
+        select video_id, title, video_url 
+        from top_return_daily 
+        where dt >= '{self.start_dt}' and dt < '{self.end_dt}' 
+        order by view_  DESC
+        limit {self.topN}
+        """
+        result_list = await self.mysql_client.select(sql)
+        return self.response_obj(result_list)
+
+    async def deal_rov(self):
+        """
+        rov
+        :return:
+        """
+        sql = f"""
+        select video_id, title, video_url 
+        from top_return_daily 
+        where dt >= '{self.start_dt}' and dt < '{self.end_dt}' 
+        order by rov DESC
+        limit {self.topN}
+        """
+        result_list = await self.mysql_client.select(sql)
+        return self.response_obj(result_list)

+ 31 - 0
dev.py

@@ -0,0 +1,31 @@
+"""
+@author: luojunhui
+"""
+import time
+from concurrent.futures.thread import ThreadPoolExecutor
+
+from applications.mysql import MySQL
+from applications.odps_server import PyODPS
+from applications.functions import generate_daily_strings
+
+
+def migrate_daily(dt):
+    """
+    迁移当天到数据
+    :param dt:
+    :return:
+    """
+    PO = PyODPS()
+    M = MySQL()
+    select_sql = f"""select * from loghubods.video_return_top_500_new where dt = '{dt}';"""
+    data = PO.select(select_sql)
+    a = time.time()
+    with ThreadPoolExecutor(max_workers=8) as pool:
+        pool.map(M.migrate_data_to_mysql, data)
+    b = time.time()
+    print("{} successfully insert {} rows, totally cost {} seconds".format(dt, len(data), b - a))
+
+
+dt_list = generate_daily_strings("20240522", "20240525")
+with ThreadPoolExecutor(max_workers=8) as Pool:
+    Pool.map(migrate_daily, dt_list)

+ 4 - 0
routes/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .vta_routes import VTARoutes

+ 31 - 0
routes/vta_routes.py

@@ -0,0 +1,31 @@
+"""
+@author: luojunhui
+"""
+import time
+import uuid
+import asyncio
+from quart import Blueprint, jsonify, request
+
+from deal import RequestDeal
+bp = Blueprint('VideosToArticle', __name__)
+
+
+def VTARoutes(mysql_client):
+    """
+
+    :param mysql_client:
+    :return:
+    """
+    @bp.route('/videos', methods=["POST"])
+    async def find_videos():
+        """
+        更具接口获取视频信息
+        :return:
+        """
+        params = await request.get_json()
+        RD = RequestDeal(params, mysql_client)
+        return await RD.deal()
+
+    return bp
+
+

+ 99 - 0
t.py

@@ -0,0 +1,99 @@
+"""
+@author: luojunhui
+"""
+"""
+@author: luojunhui
+"""
+import time
+import json
+import uuid
+import random
+import hashlib
+import requests
+import urllib.parse
+
+
+def create_gzh_path(video_id, shared_uid):
+    """
+    :param video_id: 视频 id
+    :param shared_uid: 分享 id
+    """
+
+    def auto_white(root_share_id_):
+        """
+        自动加入白名单, 保证公众号百分百出广告
+        :param root_share_id_:
+        :return:
+        """
+
+        def get_cookie():
+            """
+            获取 cookie
+            :return:
+            """
+            url = "https://admin.piaoquantv.com/manager/login?account=luojunhui&passWd=e10adc3949ba59abbe56e057f20f883e&muid=7"
+            payload = {}
+            headers = {
+                'accept': 'application/json, text/plain, */*',
+                'accept-language': 'en',
+                'priority': 'u=1, i',
+                'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
+                'sec-ch-ua-mobile': '?0',
+                'sec-ch-ua-platform': '"macOS"',
+                'sec-fetch-dest': 'empty',
+                'sec-fetch-mode': 'cors',
+                'sec-fetch-site': 'same-origin',
+                'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
+            }
+            response = requests.request("GET", url, headers=headers, data=payload)
+            return response.cookies.values()[0]
+
+        url = "https://admin.piaoquantv.com/manager/ad/own/white/rootShare/save"
+        dd = {
+            "rootShareId": root_share_id_,
+            "commit": "算法自动加入白名单--"
+        }
+        payload = json.dumps(dd)
+        cookie = get_cookie()
+        headers = {
+            'accept': 'application/json',
+            'accept-language': 'en',
+            'content-type': 'application/json;',
+            'cookie': "SESSION=" + cookie,
+            'origin': 'https://admin.piaoquantv.com',
+            'priority': 'u=1, i',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
+        }
+        response = requests.request("POST", url, headers=headers, data=payload)
+        return response.json()['content']
+
+    def generate_source_id():
+        """
+        generate_source_id
+        :return:
+        """
+        timestamp = str(int(time.time() * 1000))
+        random_str = str(random.randint(1000, 9999))
+        hash_input = f"{timestamp}-{random_str}"
+        return hashlib.md5(hash_input.encode()).hexdigest()
+
+    root_share_id = str(uuid.uuid4())
+    source_id = "longArticles_" + generate_source_id()
+    url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
+    # 自动把 root_share_id 加入到白名单
+    auto_white(root_share_id)
+    return root_share_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}"
+
+
+obj = {"productionCover": "http://rescdn.yishihui.com/d3dba68c-0ab3-4f0c-858d-7248121833da?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center", "productionName": "【揭秘】中国与该国无冤无仇,为何处处作对?专家:罪有应得🔥", "programAvatar": "https://rescdn.yishihui.com/0temp/lehuo.png", "programId": "wxe8f8f0e23cecad0f", "programName": "票圈乐活", "source": "Web", "rootShareId": "3b249e9e-dcdc-412b-9079-cb0df947128c", "productionPath": "pages/category?jumpPage=pages%2Fuser-videos%3Fid%3D20857581%26su%3D69637493%26fromGzh%3D1%26rootShareId%3D3b249e9e-dcdc-412b-9079-cb0df947128c%26shareId%3D3b249e9e-dcdc-412b-9079-cb0df947128c", "videoUrl": "http://rescdn.yishihui.com/7f0f3e2d-3006-4f40-9004-5ab871dd885f.mp4"}
+
+video_id = "20857581"
+share_uid = "69637493"
+root_id, path = create_gzh_path(video_id, share_uid)
+cover = obj["productionCover"]
+title = obj['productionName']
+
+print("封面:\t", cover)
+print("标题:\t", title)
+print("root_share_id:\t", root_id)
+print("SharePath: \t", path)

+ 24 - 0
test_req.py

@@ -0,0 +1,24 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import requests
+
+url = "http://localhost:8888/videos"
+
+body = {
+    "cate": "video_return",
+    "start_date": "2024-04-02",
+    "end_date": "2024-04-03",
+    "topN": 10
+}
+a = time.time()
+header = {
+    "Content-Type": "application/json",
+}
+
+response = requests.post(url, json=body, headers=header, timeout=600)
+b = time.time()
+print(b - a)
+print(json.dumps(response.json(), ensure_ascii=False, indent=4))

+ 34 - 0
video_app.py

@@ -0,0 +1,34 @@
+"""
+@author: luojunhui
+"""
+from quart import Quart
+from routes import VTARoutes
+from applications import AsyncMySQLClient
+
+# 初始化 App
+app = Quart(__name__)
+AsyncMySQL = AsyncMySQLClient(app)
+app_routes = VTARoutes(AsyncMySQL)
+app.register_blueprint(app_routes)
+
+
+@app.before_serving
+async def init_db():
+    """
+    初始化
+    :return:
+    """
+    await AsyncMySQL.init_pool()
+
+
+@app.after_serving
+async def close_db():
+    """
+    关闭连接
+    :return:
+    """
+    await AsyncMySQL.close_pool()
+
+
+if __name__ == '__main__':
+    app.run(debug=True)