import os import random import sys import time import uuid import json import requests from application.common.feishu import FsData sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper class XLZFRecommend(object): """ 绚烂祝福 """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.limit_flag = False self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.mysql = MysqlHelper(mode=self.mode, platform=self) def get_recommend_list(self): print("绚烂祝福") """ 获取推荐页视频 """ headers = { 'Content-Type': 'application/json' } url = "http://8.217.192.46:8889/crawler/xuan_lan_zhu_fu/recommend" data_rule = FsData() title_rule = data_rule.get_title_rule() next_cursor = "" while True: payload = json.dumps({ "cursor": next_cursor }) response = requests.request("POST", url, headers=headers, data=payload) response = response.json() if response['code'] != 0: self.aliyun_log.logging( code="3000", message="抓取单条视频失败,请求失败" ), return next_cursor = response['data']['next_cursor'] data = response['data']['data'] if len(data) == 0: return for index, video_obj in enumerate(data, 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj ) self.process_video_obj(video_obj,title_rule) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format( 1, index, e ), ) if self.limit_flag: return time.sleep(random.randint(1, 5)) def process_video_obj(self, video_obj,title_rule): """ 处理视频 :param video_obj: """ video_url = self.get_video_url(video_obj["id"]) if not video_url: return time.sleep(random.randint(3, 8)) trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) item = VideoItem() item.add_video_info("video_id", video_obj["id"]) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", 0) item.add_video_info("publish_time_stamp", int(time.time())) item.add_video_info("out_user_id", video_obj["id"]) item.add_video_info("cover_url", video_obj["img"]) item.add_video_info("like_cnt", 0) item.add_video_info("share_cnt", 0) item.add_video_info("comment_cnt", 0) item.add_video_info("video_url", video_url) item.add_video_info("out_video_id", video_obj["id"]) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): # title_list = title_rule.split(",") # title = video_obj["title"] # contains_keyword = any(keyword in title for keyword in title_list) # if contains_keyword: # new_title = GPT4oMini.get_ai_mini_title(title) # if new_title: # item.add_video_info("video_title", new_title) # current_time = datetime.now() # formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") # values = [ # [ # video_url, # video_obj["cover"], # title, # new_title, # formatted_time, # ] # ] # FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "8c7191", "ROWS", 1, 2) # time.sleep(0.5) # FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "8c7191", "A2:Z2", values) self.download_cnt += 1 self.mq.send_msg(mq_obj) self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True """获取视频链接""" def get_video_url(self, vid): url = "http://8.217.192.46:8889/crawler/xuan_lan_zhu_fu/detail" payload = json.dumps({ "content_id": f"{vid}" }) headers = { 'Content-Type': 'application/json' } try: response = requests.request("POST", url, headers=headers, data=payload, timeout=10) response = response.json() if response['code'] != 0: self.aliyun_log.logging( code="3000", message="获取视频链接失败" ), return None video_url = response['data']['data']['content_link'] return video_url except Exception as e: return None def run(self): self.get_recommend_list() if __name__ == '__main__': J = XLZFRecommend( platform="xuanlanzhufu", mode="recommend", rule_dict={}, user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}], ) J.get_recommend_list() # J.logic()