罗俊辉 9 mesiacov pred
rodič
commit
2ead8c7b79

+ 139 - 0
applications/functions.py

@@ -0,0 +1,139 @@
+"""
+@author: luojunhui
+"""
+import time
+import json
+import random
+import hashlib
+from datetime import datetime, timedelta
+
+import uuid
+import requests
+import urllib.parse
+
+
+def request_for_info(video_id):
+    """
+    请求数据
+    :param video_id:
+    :return:
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {
+        "videoIdList": [video_id]
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = requests.post(url, headers=header, data=json.dumps(data))
+    return response.json()
+
+
+def get_info_lists(vid_list):
+    """
+    获取视频list
+    :param vid_list:
+    :return:
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {
+        "videoIdList": vid_list
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = requests.post(url, headers=header, data=json.dumps(data))
+    return response.json()
+
+
+def generate_daily_strings(start_date, end_date):
+    """
+    Generate daily date_str
+    :param start_date:
+    :param end_date:
+    :return:
+    """
+    start = datetime.strptime(start_date, "%Y%m%d")
+    end = datetime.strptime(end_date, "%Y%m%d")
+    current = start
+    date_strings = []
+    while current <= end:
+        date_strings.append(current.strftime("%Y%m%d"))
+        current += timedelta(days=1)
+    return date_strings
+
+
+def whisper(video_id):
+    """
+    input video_id, output video_text
+    :param video_id:
+    :return:
+    """
+    url = "http://61.48.133.26:5999/video_to_text"
+    body = {
+        "video_id": video_id
+    }
+    header = {
+        "Content-Type": "application/json",
+    }
+    response = requests.post(
+        url=url,
+        json=body,
+        headers=header
+    )
+    return response.json()
+
+
+def hash_title(title):
+    """
+    hash map
+    :param title:
+    :return:
+    """
+    # 创建md5哈希对象
+    hash_object = hashlib.md5()
+
+    # 对标题进行编码
+    title_bytes = title.encode('utf-8')
+
+    # 更新哈希对象
+    hash_object.update(title_bytes)
+
+    # 获取十六进制形式的哈希值
+    hash_hex = hash_object.hexdigest()
+
+    return hash_hex
+
+
+def create_gzh_path(video_id, shared_uid):
+    """
+    :param video_id: 视频 id
+    :param shared_uid: 分享 id
+    """
+
+    def generate_source_id():
+        """
+        generate_source_id
+        :return:
+        """
+        timestamp = str(int(time.time() * 1000))
+        random_str = str(random.randint(1000, 9999))
+        hash_input = f"{timestamp}-{random_str}"
+        return hashlib.md5(hash_input.encode()).hexdigest()
+
+    root_share_id = str(uuid.uuid4())
+    source_id = "video_to_articles" + generate_source_id()
+    url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
+    # 自动把 root_share_id 加入到白名单
+    # auto_white(root_share_id)
+    return root_share_id, source_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}"
+
+
+def chunks(chunk_list, chunk_size):
+    """
+    分页
+    :param chunk_list:
+    :param chunk_size:
+    """
+    for i in range(0, len(chunk_list), chunk_size):
+        yield chunk_list[i: i + chunk_size]

+ 63 - 0
applications/mysql.py

@@ -0,0 +1,63 @@
+"""
+@author: luojunhui
+"""
+import pymysql
+
+
+class MySQL(object):
+    """
+    MySQL 方法
+    """
+
+    @classmethod
+    def get_pq_top_return_videos(cls, video_id):
+        """
+        获取票圈视频
+        :return:
+        """
+        connection = pymysql.connect(
+            host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            port=3306,  # 端口号
+            user="wx2016_longvideo",  # mysql用户名
+            passwd="wx2016_longvideoP@assword1234",  # mysql用户登录密码
+            db="incentive",  # 数据库名
+            charset="utf8mb4"  # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+        )
+        sql = f"""select * from video_content where video_id = {video_id};"""
+        cursor = connection.cursor()
+        cursor.execute(sql)
+        data = cursor.fetchall()
+        return data[0]
+
+    @classmethod
+    def migrate_data_to_mysql(cls, obj):
+        """
+        把 data_works 数据迁移到数据库
+        :param obj:
+        :return:
+        """
+        video_id = obj['videoid']
+        title = obj['title']
+        return_ = obj['当日回流']
+        view_ = obj['曝光量']
+        video_url = obj['视频原地址']
+        dt = obj['dt']
+        rov = int(return_) / int(view_) if int(view_) > 0 else 0
+        insert_sql = f"""
+            INSERT INTO top_return_daily
+                (video_id, title, view_, return_, video_url, dt, rov)
+            VALUES 
+                ({video_id}, '{title}', {view_}, {return_}, '{video_url}', '{dt}', {rov});
+        """
+        # print(insert_sql)
+        connection = pymysql.connect(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4'
+        )
+        cursor = connection.cursor()
+        cursor.execute(insert_sql)
+        connection.commit()

+ 34 - 0
applications/odps_server.py

@@ -0,0 +1,34 @@
+"""
+@author: luojunhui
+"""
+from odps import ODPS
+
+
+class PyODPS(object):
+    """
+    PyODPS class, get data from odps server
+    """
+
+    def __init__(self):
+        self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
+        self.access_id = "LTAIWYUujJAm7CbH"
+        self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        self.project = "loghubods"
+
+        self.od = ODPS(
+            access_id=self.access_id,
+            secret_access_key=self.access_key,
+            endpoint=self.endpoint,
+            project=self.project,
+        )
+
+    def select(self, sql):
+        """
+        :param sql: 查询语句
+        :return: odps_obj{}
+        """
+        result = []
+        with self.od.execute_sql(sql).open_reader() as reader:
+            for record in reader:
+                result.append(record)
+        return result

+ 45 - 0
schedule_app.py

@@ -0,0 +1,45 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import time
+
+import schedule
+
+from tasks import *
+
+
+# 自动迁移数据任务
+def migrate_videos_task():
+    """
+    自动迁移数据
+    :return:
+    """
+    today = datetime.datetime.today()
+    yesterday = today - datetime.timedelta(days=1)
+    dt = yesterday.strftime("%Y%m%d")
+    migrate_daily(dt=dt)
+
+
+# 自动下架视频任务
+def get_off_videos_task():
+    """
+    自动下架视频
+    :return:
+    """
+    AG = AutoGetOffVideos()
+    AG.task1()
+    time.sleep(60)
+    AG.task2()
+
+
+if __name__ == '__main__':
+
+    schedule.every().day.at("06:00").do(migrate_videos_task)
+
+    schedule.every().day.at("05:00").do(get_off_videos_task)
+
+    while True:
+        schedule.run_pending()
+        print("定时任务正在执行")
+        time.sleep(1)

+ 5 - 0
tasks/__init__.py

@@ -1,3 +1,8 @@
 """
 """
 @author: luojunhui
 @author: luojunhui
 """
 """
+# 自动下架视频任务
+from .auto_getoff_videos import AutoGetOffVideos
+
+# 自动迁移视频任务
+from .migrate_daily_top_500_videos_info import migrate_daily

+ 0 - 3
tasks/auto_getoff_videos.py

@@ -128,6 +128,3 @@ class AutoGetOffVideos(object):
             Pool1.map(cls.change_status, list(video_set))
             Pool1.map(cls.change_status, list(video_set))
 
 
 
 
-if __name__ == '__main__':
-    AutoGetOffVideos().task2()
-

+ 22 - 0
tasks/migrate_daily_top_500_videos_info.py

@@ -1,3 +1,25 @@
 """
 """
 @author: luojunhui
 @author: luojunhui
 """
 """
+import time
+from concurrent.futures.thread import ThreadPoolExecutor
+
+from applications.mysql import MySQL
+from applications.odps_server import PyODPS
+
+
+def migrate_daily(dt):
+    """
+    迁移当天到数据
+    :param dt:
+    :return:
+    """
+    PO = PyODPS()
+    M = MySQL()
+    select_sql = f"""select * from loghubods.video_return_top_500_new where dt = '{dt}';"""
+    data = PO.select(select_sql)
+    a = time.time()
+    with ThreadPoolExecutor(max_workers=8) as pool:
+        pool.map(M.migrate_data_to_mysql, data)
+    b = time.time()
+    print("{} successfully insert {} rows, totally cost {} seconds".format(dt, len(data), b - a))