# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/8/1 import json import os import random import time import oss2 import requests import urllib3 from main.common import Common proxies = {"http": None, "https": None} class Publish: @classmethod def publish_video_dev(cls, log_type, request_data): """ loginUid 站内uid (随机) appType 默认:888888 crawlerSrcId 站外视频ID crawlerSrcCode 渠道(自定义 KYK) crawlerSrcPublishTimestamp 视频原发布时间 crawlerTaskTimestamp 爬虫创建时间(可以是当前时间) videoPath 视频oss地址 coverImgPath 视频封面oss地址 title 标题 totalTime 视频时长 viewStatus 视频的有效状态 默认1 versionCode 版本 默认1 :return: """ # Common.logger().info('publish request data: {}'.format(request_data)) result = cls.request_post('https://videotest.yishihui.com/longvideoapi/crawler/video/send', request_data) # Common.logger(log_type).info('publish result: {}'.format(result)) video_id = result["data"]["id"] # Common.logger(log_type).info('video_id: {}'.format(video_id)) if result['code'] != 0: Common.logger(log_type).error('pushlish failure msg = {}'.format(result['msg'])) else: Common.logger(log_type).info('publish success video_id = : {}'.format(request_data['crawlerSrcId'])) return video_id @classmethod def publish_video_prod(cls, log_type, request_data): """ loginUid 站内uid (随机) appType 默认:888888 crawlerSrcId 站外视频ID crawlerSrcCode 渠道(自定义 KYK) crawlerSrcPublishTimestamp 视频原发布时间 crawlerTaskTimestamp 爬虫创建时间(可以是当前时间) videoPath 视频oss地址 coverImgPath 视频封面oss地址 title 标题 totalTime 视频时长 viewStatus 视频的有效状态 默认1 versionCode 版本 默认1 :return: """ result = cls.request_post('https://longvideoapi.piaoquantv.com/longvideoapi/crawler/video/send', request_data) # Common.logger(log_type).info('publish result: {}'.format(result)) video_id = result["data"]["id"] # Common.logger(log_type).info('video_id: {}'.format(video_id)) if result['code'] != 0: Common.logger(log_type).error('pushlish failure msg = {}'.format(result['msg'])) else: Common.logger(log_type).info('publish success video_id = : {}'.format(request_data['crawlerSrcId'])) return video_id @classmethod def request_post(cls, request_url, request_data): """ post 请求 HTTP接口 :param request_url: 接口URL :param request_data: 请求参数 :return: res_data json格式 """ urllib3.disable_warnings() response = requests.post(url=request_url, data=request_data, proxies=proxies, verify=False) if response.status_code == 200: res_data = json.loads(response.text) return res_data # 以下代码展示了基本的文件上传、下载、罗列、删除用法。 # 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。 # 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。 # # 以杭州区域为例,Endpoint可以是: # http://oss-cn-hangzhou.aliyuncs.com # https://oss-cn-hangzhou.aliyuncs.com # 分别以HTTP、HTTPS协议访问。 access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAIP6x1l3DXfSxm') access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'KbTaM9ars4OX3PMS6Xm7rtxGr1FLon') bucket_name = os.getenv('OSS_TEST_BUCKET', 'art-pubbucket') # endpoint = os.getenv('OSS_TEST_ENDPOINT', 'oss-cn-hangzhou-internal.aliyuncs.com') endpoint = os.getenv('OSS_TEST_ENDPOINT', 'oss-cn-hangzhou.aliyuncs.com') # 确认上面的参数都填写正确了 for param in (access_key_id, access_key_secret, bucket_name, endpoint): assert '<' not in param, '请设置参数:' + param # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行 bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) """ 处理流程: 1. 定时(每天凌晨1点执行一次)循环files文件下的内容 结构:files -> 视频文件夹 -> 视频文件 + 封面图 + 基本信息 2. 视频文件和封面上传到oss - 视频文件oss目录 longvideo/crawler_local/video/prod/文件名 - 视频封面oss目录 longvideo/crawler_local/image/prod/文件名 3. 发布视频 - 读取 基本信息 调用发布接口 """ # env 日期20220225 文件名 oss_file_path_video = 'longvideo/crawler_local/video/{}/{}/{}' oss_file_path_image = 'longvideo/crawler_local/image/{}/{}/{}' @classmethod def put_file(cls, log_type, oss_file, local_file): cls.bucket.put_object_from_file(oss_file, local_file) Common.logger(log_type).info("put oss file = {}, local file = {} success".format(oss_file, local_file)) # 清除本地文件 @classmethod def remove_local_file(cls, log_type, local_file): os.remove(local_file) Common.logger(log_type).info("remove local file = {} success".format(local_file)) # 清除本地文件夹 @classmethod def remove_local_file_dir(cls, log_type, local_file): os.rmdir(local_file) Common.logger(log_type).info("remove local file dir = {} success".format(local_file)) local_file_path = './videos' video_file = 'video' image_file = 'image' info_file = 'info' uids_dev_up = [6267140] uids_dev_play = [6267141] uids_prod_up = [20631273, 20631274, 20631275, 20631276, 20631277] uids_prod_play = [20631273, 20631274, 20631275, 20631276, 20631277] @classmethod def upload_and_publish(cls, log_type, env, job): """ 上传视频到 oss :param log_type: 选择的 log :param env: 测试环境:dev,正式环境:prod :param job: 上升榜:up,播放量:play """ Common.logger(log_type).info("upload_and_publish starting...") today = time.strftime("%Y%m%d", time.localtime()) # videos 目录下的所有视频文件夹 files = os.listdir(cls.local_file_path) for f in files: try: # 单个视频文件夹 fi_d = os.path.join(cls.local_file_path, f) # 确认为视频文件夹 if os.path.isdir(fi_d): Common.logger(log_type).info('dir = {}'.format(fi_d)) # 列出所有视频文件夹 dir_files = os.listdir(fi_d) data = {'appType': '888888', 'crawlerSrcCode': 'GONGZHONGHAO', 'viewStatus': '1', 'versionCode': '1'} now_timestamp = int(round(time.time() * 1000)) data['crawlerTaskTimestamp'] = str(now_timestamp) global uid if env == "dev" and job == "up": uid = str(random.choice(cls.uids_dev_up)) elif env == "dev" and job == "play": uid = str(random.choice(cls.uids_dev_play)) elif env == "prod" and job == "up": uid = str(random.choice(cls.uids_prod_up)) elif env == "prod" and job == "play": uid = str(random.choice(cls.uids_prod_play)) data['loginUid'] = uid # 单个视频文件夹下的所有视频文件 for fi in dir_files: # 视频文件夹下的所有文件路径 fi_path = fi_d + '/' + fi Common.logger(log_type).info('dir fi_path = {}'.format(fi_path)) # 读取 info.txt,赋值给 data if cls.info_file in fi: f = open(fi_path, "r", encoding="UTF-8") # 读取数据 数据准确性写入的时候保证 读取暂不处理 for i in range(14): line = f.readline() line = line.replace('\n', '') if line is not None and len(line) != 0 and not line.isspace(): # Common.logger(log_type).info("line = {}".format(line)) if i == 0: data['crawlerSrcId'] = line elif i == 1: data['title'] = line elif i == 2: data['totalTime'] = line elif i == 8: data['crawlerSrcPublishTimestamp'] = line else: Common.logger(log_type).warning("{} line is None".format(fi_path)) f.close() # remove info.txt cls.remove_local_file(log_type, fi_path) # 刷新数据 dir_files = os.listdir(fi_d) for fi in dir_files: fi_path = fi_d + '/' + fi # Common.logger(log_type).info('dir fi_path = {}'.format(fi_path)) # 上传oss if cls.video_file in fi: global oss_video_file if env == "dev": oss_video_file = cls.oss_file_path_video.format("dev", today, data['crawlerSrcId']) elif env == "prod": oss_video_file = cls.oss_file_path_video.format("prod", today, data['crawlerSrcId']) Common.logger(log_type).info("oss_video_file = {}".format(oss_video_file)) cls.put_file(log_type, oss_video_file, fi_path) data['videoPath'] = oss_video_file Common.logger(log_type).info("videoPath = {}".format(oss_video_file)) elif cls.image_file in fi: global oss_image_file if env == "dev": oss_image_file = cls.oss_file_path_image.format("env", today, data['crawlerSrcId']) elif env == "prod": oss_image_file = cls.oss_file_path_image.format("prod", today, data['crawlerSrcId']) Common.logger(log_type).info("oss_image_file = {}".format(oss_image_file)) cls.put_file(log_type, oss_image_file, fi_path) data['coverImgPath'] = oss_image_file Common.logger(log_type).info("coverImgPath = {}".format(oss_image_file)) # 全部remove cls.remove_local_file(log_type, fi_path) # 发布 if env == "dev": video_id = cls.publish_video_dev(log_type, data) elif env == "prod": video_id = cls.publish_video_prod(log_type, data) else: video_id = cls.publish_video_dev(log_type, data) cls.remove_local_file_dir(log_type, fi_d) return video_id else: Common.logger(log_type).error('file not a dir = {}'.format(fi_d)) except Exception as e: Common.logger(log_type).exception('upload_and_publish error', e)