import os import sys import json import random import uuid import time import traceback from datetime import datetime import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry sys.path.append(os.getcwd()) from application.common.feishu import FsData from application.common.feishu.feishu_utils import FeishuUtils from application.common.gpt import GPT4oMini from application.common.messageQueue import MQ from application.common.log import AliyunLogger from application.functions.zqkd_db_redis import DatabaseOperations, RedisOperations from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.log import Local class ZhongQingKanDianRelatedRecommend: API_BASE_URL = "http://8.217.192.46:8889" COMMON_HEADERS = { "Content-Type": "application/json" } # 最大重试次数 MAX_RETRIES = 3 # 最大等待时长 TIMEOUT = 30 def __init__(self, platform, mode, rule_dict, user_list, env="prod"): """ 初始化 :param platform: 平台名称 zhongqingkandian :param mode: 运行模式 recommend :param rule_dict: 规则字典,包含视频数量限制、时长限制等规则 [{"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}] :param user_list: 用户列表 :param env: 运行环境,默认为 "prod" """ self.limit_flag = True self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.db_ops = DatabaseOperations(mode=mode, platform=platform) self.redis_ops = RedisOperations(mode=mode, platform=platform) data_rule = FsData() self.title_rule = data_rule.get_title_rule() self.LocalLog = Local.logger(self.platform, self.mode) self.session = requests.session() def __del__(self): if self.session: self.LocalLog.info("session 被正确关闭") self.session.close() def send_request(self, path, data): """发送带重试机制的API请求""" for attempt in range(self.MAX_RETRIES): try: response = self.session.post( f"{self.API_BASE_URL}{path}", data=data, timeout=self.TIMEOUT, headers=self.COMMON_HEADERS ) resp_data = response.json() # 检查响应格式 if 'code' not in resp_data: self.LocalLog.warning(f"{path}响应缺少code字段,尝试重试") raise ValueError("Missing 'code' in response") code = resp_data['code'] # 成功情况 (code=0) if code == 0: self.LocalLog.info(f"{path}请求成功:{resp_data}") return resp_data # 特定错误码不重试 if code == 29036: self.LocalLog.warning(f"{path}返回code:29036,消息:{resp_data}") return None # 其他错误码重试 self.LocalLog.warning(f"{path}返回错误码{code},尝试重试,响应内容:{resp_data}") except Exception as e: tb_info = traceback.format_exc() self.LocalLog.error(f"{path}请求异常: {str(e)} \n {tb_info}") if attempt < self.MAX_RETRIES - 1: time.sleep(random.randint(5, 10)) # 所有重试失败,记录错误并返回None self.LocalLog.error(f"{path}达到最大重试次数") self.aliyun_log.logging( code="3000", message=f"请求 {path} 失败,达到最大重试次数", data=data ) return None def req_related_recommend_list(self, content_id): """ 同步请求与指定内容 ID 相关的推荐列表。 :param :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None """ try: url = '/crawler/zhong_qing_kan_dian/related' body = json.dumps({ "content_id": f"{content_id}", "cursor": "" }) self.LocalLog.info(f"开始请求相关推荐{body}") return self.send_request(url, body) except Exception as e: tb_info = traceback.format_exc() self.aliyun_log.logging( code="1004", message=f"请求相关推荐视频列表时发生异常,错误信息: {str(e)}", data={"url": url} ) self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e} \n{tb_info}") return None def req_detail(self, content_link, **kwargs): """ 同步请求视频详情。 :param content_link: 视频内容链接 :param kwargs: 额外的视频信息 :return: 无返回值,处理视频详情信息 """ try: self.LocalLog.info(f"开始请求视频详情,链接: {content_link}") url = '/crawler/zhong_qing_kan_dian/detail' body = json.dumps({ "content_link": content_link }) resp = self.send_request(url, body) if not resp: return data = resp.get("data", {}).get("data", {}) if data.get("content_type") != "video": self.aliyun_log.logging( code="3003", message=f"跳过非视频内容", data={"content_link": content_link} ) self.LocalLog.info(f"跳过非视频内容,链接: {content_link}") return self.LocalLog.info(f"{content_link} 是视频") data.update(kwargs) self.process_video_obj(data) except Exception as e: tb_info = traceback.format_exc() self.aliyun_log.logging( code="1005", message=f"请求视频详情时发生异常,错误信息: {str(e)}", data={"content_link": content_link} ) self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e} \n{tb_info}") def control_request_related(self): """ 控制相关推荐视频列表的请求和处理流程。 :return: 无返回值,根据下载数量限制控制流程 """ while self.limit_flag: try: self.download_cnt = self.db_ops.get_today_videos() if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100): self.aliyun_log.logging( code="2010", message=f"今日已经达到最大量", data=self.download_cnt ) self.LocalLog.info(f"当日视频已达到最大爬取量{self.download_cnt}") return self.LocalLog.info(f"开始推荐视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频") content_id = self.redis_ops.get_recommend_video() if not content_id: self.LocalLog.info("缓存中【task:zqkd_video_id】没有数据") continue time.sleep(random.randint(5, 10)) related_resp = self.req_related_recommend_list(content_id) if not related_resp: continue related_list = related_resp.get("data", {}).get("data", []) self.LocalLog.info(f"获取的推荐列表长度:{len(related_list)}") for related_obj in related_list: # if not self.limit_flag: # self.LocalLog.info(f"今日视频数量已达最大量{self.download_cnt}") # return related_content_link = related_obj.get("share_info", {}).get("share_url") self.LocalLog.info(f"related_content_link == {related_content_link}") if related_content_link: time.sleep(random.randint(5, 10)) self.req_detail(related_content_link, **related_obj) except Exception as e: tb_info = traceback.format_exc() self.aliyun_log.logging( code="3009", message=f"控制相关推荐视频请求和处理时发生异常,错误信息: {str(e)}", data={} ) self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}") def process_video_obj(self, video_obj): """ 处理视频对象,包括检查视频时长、用户信息、保存数据等操作。 :param video_obj: 视频对象,包含视频的各种信息 :return: 无返回值,完成视频对象的处理 """ try: video_duration = video_obj["video_url_list"][0]['video_duration'] video_id = video_obj['channel_content_id'] # 检查视频ID是否存在 if self.redis_ops.check_video_id_exists(video_id): self.aliyun_log.logging( code="3004", message=f"重复视频ID:{video_id}" ) self.LocalLog.info(f"重复视频ID: {video_id}") return our_user = random.choice(self.user_list) trace_id = self.platform + str(uuid.uuid1()) item = VideoItem() account_id = video_obj["channel_account_id"] account_name = video_obj["channel_account_name"] account_avatar = video_obj["avatar"] self.db_ops.insert_user(account_id, account_name, account_avatar) self.LocalLog.info(f"用户{account_id}存入数据库") if video_duration > self.rule_dict.get("duration", {}).get("max", 1200) or video_duration < self.rule_dict.get("duration", {}).get("min", 30): self.aliyun_log.logging( code="3005", message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}" ) self.LocalLog.info( f"视频时长不满足条件,视频ID: {video_obj['channel_content_id']}, 视频时长: {video_duration}") return item.add_video_info("video_id", video_obj['channel_content_id']) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", int(video_obj["read_count"])) item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000)) item.add_video_info("out_user_id", video_obj["channel_account_id"]) item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url']) item.add_video_info("like_cnt", 0) item.add_video_info("collection_cnt", 0) item.add_video_info("share_cnt", int(video_obj["share_count"])) item.add_video_info("comment_cnt", int(video_obj["comment_count"])) item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url']) item.add_video_info("out_video_id", int(video_obj["channel_content_id"])) item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration']) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", f"{self.platform}-{int(time.time())}") item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id ) if pipeline.process_item(): title_list = self.title_rule.split(",") title = video_obj["title"] contains_keyword = any(keyword in title for keyword in title_list) if contains_keyword: new_title = GPT4oMini.get_ai_mini_title(title) if new_title: item.add_video_info("video_title", new_title) current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [ [ video_obj["video_url_list"][0]['video_url'], video_obj["image_url_list"][0]['image_url'], title, new_title, formatted_time, ] ] FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2) time.sleep(0.5) FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values) self.download_cnt += 1 self.mq.send_msg(mq_obj) self.aliyun_log.logging( code="2009", message=f"成功发送视频到etl", data={"video_obj": video_obj} ) self.LocalLog.info(f"成功发送etl") # 保存视频ID self.redis_ops.save_video_id(video_obj['channel_content_id']) if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100): self.limit_flag = False except Exception as e: tb_info = traceback.format_exc() self.aliyun_log.logging( code="1005", message=f"处理视频对象时发生异常,错误信息: {str(e)}", data={"video_obj": video_obj} ) self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}") def run(self): """ 运行主流程,执行相关推荐视频的请求,直到达到下载数量限制。 :return: 无返回值,程序运行的主逻辑 """ self.LocalLog.info("开始执行中青看点相关推荐抓取...") self.control_request_related() if __name__ == '__main__': ZhongQingKanDianRelatedRecommend( platform="zhongqingkandianrelated", mode="related", rule_dict={"videos_cnt": {"min": 200, "max": 0}}, user_list=[{"uid": 81525095, "link": "中青看点推荐", "nick_name": "善惡"}] ).run()