import asyncio import os import random import sys import time import uuid import json from datetime import datetime import aiohttp import requests from application.common.feishu import FsData from application.common.feishu.feishu_utils import FeishuUtils from application.common.gpt import GPT4oMini from application.common.redis.redis_helper import SyncRedisHelper sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper class ZhongQingKanDian: # / recommend(列表11个id) # ↓ 并发请求每个id的 / related(得到列表N个元素) # ↓ 对每个元素并发请求 / detail # ↓ 若为视频,写入Redis(键:detail_id,值:视频数据) API_BASE_URL = "http://8.217.192.46:8889" COMMON_HEADERS = { "Content-Type": "application/json" } MAX_RETRIES = 3 TIMEOUT = 30 # 设置超时时间 max_recommend_count = 100 # 推荐抓取每日最大量 max_related_recommend_count = 200 # 相关推荐抓取每日最大量 max_author_video = 300 # 账号每日抓取视频最大量 """ 中青看点推荐流 Topic:zqkd_recommend_prod """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.limit_flag = False self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.mysql = MysqlHelper(mode=self.mode, platform=self) data_rule = FsData() self.title_rule = data_rule.get_title_rule() async def send_request(self, path, data): full_url = f"{self.API_BASE_URL}{path}" async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session: for retry in range(self.MAX_RETRIES): try: async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: if retry < self.MAX_RETRIES - 1: await asyncio.sleep(2) except json.JSONDecodeError as e: if retry < self.MAX_RETRIES - 1: await asyncio.sleep(2) return None def is_response_valid(self, resp): if resp['code'] != 0: self.aliyun_log.logging( code="3000", message="抓取单条视频失败,请求失败" ), return return resp async def req_recommend_list(self): print("开始请求推荐") ''' 推荐请求 ''' url = '/crawler/zhong_qing_kan_dian/recommend' body = json.dumps({"cursor": ""}) resp = await self.send_request(url, body) return self.is_response_valid(resp) async def req_related_recommend_list(self, content_id): print("请求相关推荐") ''' 相关推荐请求 ''' url = '/crawler/zhong_qing_kan_dian/related' body = json.dumps({ "content_id": str(content_id), "cursor": "" }) resp = await self.send_request(url, body) return self.is_response_valid(resp) async def req_detail(self, content_link, label,**kwargs): print("请求详情") ''' 请求详情 ''' url = '/crawler/zhong_qing_kan_dian/detail' body = json.dumps({ "content_link": content_link }) resp = await self.send_request(url, body) if not self.is_response_valid(resp): return data = resp.get("data", {}).get("data", {}) if data.get("content_type") != "video": self.aliyun_log.logging( code="3003", message=f"跳过非视频内容(label={label})", data={"content_link": content_link} ) return print("是视频") # 将 kwargs 中的键值对更新到 data 字典中 data.update(kwargs) self.process_video_obj(data, label) await asyncio.sleep(10) async def control_request(self): print("开始处理") """核心控制逻辑:顺序处理三个接口""" recommend_resp = await self.req_recommend_list() if not self.is_response_valid(recommend_resp): return recommend_list = recommend_resp.get("data", {}).get("data", []) for video_obj in recommend_list: content_link = video_obj.get("share_url") content_id = video_obj.get("id") if not (content_link and content_id): continue # 处理推荐视频详情 await self.req_detail(content_link, "recommend",**video_obj) # # 处理相关推荐列表(间隔后执行) # await asyncio.sleep(5) # related_resp = await self.req_related_recommend_list(content_id) # if not self.is_response_valid(related_resp): # continue # # related_list = related_resp.get("data", {}).get("data", []) # for related_obj in related_list: # related_content_link = related_obj.get("share_url") # if related_content_link: # await self.req_detail(related_content_link, "related",**related_obj) def process_video_obj(self, video_obj, label): """ 处理视频 :param video_obj: """ if not self.save_video_id(): our_user = random.choice(self.user_list) trace_id = self.platform + str(uuid.uuid1()) item = VideoItem() try: video_id = video_obj['channel_content_id'] account_id = video_obj["channel_account_id"] account_name = video_obj["channel_account_name"] account_avatar = video_obj["avatar"] is_repeat_user = self.select_id(account_id) # 判断用户是否重复 if is_repeat_user: self.update_name_url(account_id, account_name, account_avatar) else: # 写表 self.insert_name_url(account_id, account_name, account_avatar) # 写redis self.write_redis_user_data(json.dumps({"uid": account_id})) print("写入成功") except Exception as e: print(f"写入异常{e}") pass url = video_obj["video_url_list"][0]['video_url'] duration = video_obj["video_url_list"][0]['video_duration'] item.add_video_info("video_id", video_obj['channel_content_id']) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", int(video_obj["read_num"])) item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"])/1000)) item.add_video_info("out_user_id", video_obj["channel_account_id"]) item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url']) item.add_video_info("like_cnt", 0) item.add_video_info("collection_cnt", int(video_obj['collect_num'])) item.add_video_info("share_cnt", int(video_obj["share_num"])) item.add_video_info("comment_cnt", int(video_obj["cmt_num"])) item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url']) item.add_video_info("out_video_id", int(video_obj["channel_content_id"])) item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration']) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): title_list = self.title_rule.split(",") title = video_obj["title"] contains_keyword = any(keyword in title for keyword in title_list) if contains_keyword: new_title = GPT4oMini.get_ai_mini_title(title) if new_title: item.add_video_info("video_title", new_title) current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [ [ video_obj["video_url_list"][0]['video_url'], video_obj["image_url_list"][0]['image_url'], title, new_title, formatted_time, ] ] FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2) time.sleep(0.5) FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values) self.download_cnt += 1 self.mq.send_msg(mq_obj) self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True if label == "recommend": key = f"crawler:zqkd:{video_id}" self.save_video_id(key) """ 查询用户id是否存在 """ def select_id(self, uid): sql = f""" select uid from zqkd_uid where uid = "{uid}"; """ db = MysqlHelper() repeat_user = db.select(sql=sql) if repeat_user: return True return False def update_name_url(self, uid,user_name,avatar_url): sql = f""" update zqkd_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{uid}"; """ db = MysqlHelper() repeat_video = db.update(sql=sql) if repeat_video: return True return False def insert_name_url(self, uid, user_name, avatar_url): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')""" db = MysqlHelper() repeat_video = db.update(sql=insert_sql) if repeat_video: return True return False def get_redis_video_data(self): """获取一条id""" task = f"task:zqkd_video_id" helper = SyncRedisHelper() client = helper.get_client() # 获取列表的长度 list_length = client.llen(task) # 循环获取列表中的元素 for i in range(list_length): # 使用 lrange 获取单个元素 element = client.lrange(task, i, i) if element: print(f"Element at index {i}: {element[0].decode('utf-8')}") return element def write_redis_user_data(self,key,ret): """写入""" task = f"task:zqkd_user_id" helper = SyncRedisHelper() client = helper.get_client() client.rpush(task, ret) async def run(self): while True: await self.control_request() def save_video_id(self,key): helper = SyncRedisHelper() client = helper.get_client() # 将视频ID存储到Redis中,并设置过期时间为7天 # 检查键是否存在 if client.exists(key): return False else: expiration_time = int(timedelta(days=7).total_seconds()) client.setex(key, expiration_time, "1") from datetime import datetime, timedelta if __name__ == '__main__': # asyncio.run(ZhongQingKanDian( # platform="zhongqingkandian", # mode="recommend", # rule_dict={}, # user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"}, # ] # # ).run()) save_video_id("1234")