import os import random import sys import time import uuid import json from datetime import datetime import cv2 import requests from application.common import Feishu sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper class ZFQZTJLRecommend(object): """ 祝福圈子推荐流 """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.limit_flag = False self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.mysql = MysqlHelper(mode=self.mode, platform=self) def get_video_duration(self, video_link: str) -> int: cap = cv2.VideoCapture(video_link) if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) duration = int(frame_num / rate) return duration return 0 def get_recommend_list(self): print("祝福圈子推荐流开始") """ 获取推荐页视频 """ headers = { 'Content-Type': 'application/json' } url = "http://47.236.68.175:8889/crawler/zhu_fu_quan_zi/recommend" payload = json.dumps({}) response = requests.request("POST", url, headers=headers, data=payload) response = response.json() if response['code'] != 0: self.aliyun_log.logging( code="3000", message="抓取单条视频失败,请求失败" ), return for index, video_obj in enumerate(response['data']['data'], 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj ) self.process_video_obj(video_obj) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format( 1, index, e ), ) if self.limit_flag: return time.sleep(random.randint(5, 10)) def process_video_obj(self, video_obj): """ 处理视频 :param video_obj: """ time.sleep(random.randint(3, 8)) trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) item = VideoItem() url = video_obj["v_url"] duration = self.get_video_duration(url) item.add_video_info("video_id", video_obj["id"]) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", int(video_obj["play_pv"])) item.add_video_info("publish_time_stamp", int(int(video_obj["t"])/1000)) item.add_video_info("out_user_id", video_obj["id"]) item.add_video_info("cover_url", video_obj["url"]) item.add_video_info("like_cnt", 0) item.add_video_info("share_cnt", int(video_obj["share"])) item.add_video_info("comment_cnt", int(video_obj["comment_count"])) item.add_video_info("video_url", video_obj["v_url"]) item.add_video_info("out_video_id", video_obj["id"]) item.add_video_info("duration", int(duration)) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(mq_obj) self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True def run(self): self.get_recommend_list() if __name__ == '__main__': J = ZFQZTJLRecommend( platform="zhufuquanzituijianliu", mode="recommend", rule_dict={}, user_list=[{"uid": 75590470, "link": "zfqz推荐流_接口1", "nick_name": "做你的尾巴"}, {"uid": 75590471, "link": "zfqz推荐流_接口2", "nick_name": "能够相遇"}, {"uid": 75590472, "link": "zfqz推荐流_接口3", "nick_name": "一别两宽各生欢喜"}, {"uid": 75590473, "link": "zfqz推荐流_接口4", "nick_name": "惹火"}, {"uid": 75590475, "link": "zfqz推荐流_接口5", "nick_name": "顾九"}, {"uid": 75590476, "link": "zfqz推荐流_接口6", "nick_name": "宠一身脾气惯一身毛病"}], ) J.get_recommend_list() # J.logic()