import os import random import sys import time import uuid import json from datetime import datetime from typing import Dict, Any import cv2 import requests from application.common.feishu import FsData from application.common.feishu.feishu_utils import FeishuUtils from application.common.gpt import GPT4oMini from application.common.mysql.sql import Sql from application.common.redis.xng_redis import xng_in_video_data from application.config.config import xiaoniangao_view_api,xiaoniangao_history_api,xiaoniangao_log_upload_api sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper def video_view(content_id, account_id): headers = { "Content-Type": "application/json" } payload = { "content_id": str(content_id), "account_id": str(account_id) } try: # 发送 POST 请求 response = requests.post( xiaoniangao_view_api, headers=headers, json=payload # 自动将字典转换为 JSON ) # 检查 HTTP 状态码 if response.status_code == 200: # 解析 JSON 响应 result = response.json() # 提取关键字段 code = result.get("code") msg = result.get("msg") # 业务逻辑处理(示例) if code == 0: print("请求成功") else: print(f"{xiaoniangao_view_api}请求失败,错误码: {code}, 消息: {msg}") else: print(f"{xiaoniangao_view_api}HTTP 请求失败,状态码: {response.status_code}") except requests.exceptions.RequestException as e: print(f"{xiaoniangao_view_api}请求异常: {e}") except json.JSONDecodeError: print(f"{xiaoniangao_view_api}响应不是有效的 JSON 格式") def video_history(video_view_lists): headers = { "Content-Type": "application/json" } payload = { "content_ids": video_view_lists } try: # 发送 POST 请求 response = requests.post( xiaoniangao_history_api, headers=headers, json=payload # 自动将字典转换为 JSON ) # 检查 HTTP 状态码 if response.status_code == 200: # 解析 JSON 响应 result = response.json() # 提取关键字段 code = result.get("code") msg = result.get("msg") # 业务逻辑处理(示例) if code == 0: print("请求成功") else: print(f"{xiaoniangao_history_api}请求失败{payload},错误码: {code}, 消息: {msg}") else: print(f"{xiaoniangao_history_api}HTTP 请求失败{payload},状态码: {response.status_code}") except requests.exceptions.RequestException as e: print(f"{xiaoniangao_history_api}请求异常: {e}") except json.JSONDecodeError: print(f"{xiaoniangao_history_api}响应不是有效的 JSON 格式") def log_upload(video_objs): headers = { "Content-Type": "application/json" } payload = { "e": video_objs } try: # 发送 POST 请求 response = requests.post( xiaoniangao_log_upload_api, headers=headers, json=payload # 自动将字典转换为 JSON ) # 检查 HTTP 状态码 if response.status_code == 200: # 解析 JSON 响应 result = response.json() # 提取关键字段 code = result.get("code") msg = result.get("msg") # 业务逻辑处理(示例) if code == 0: print(f"{xiaoniangao_log_upload_api}请求成功") else: print(f"{xiaoniangao_log_upload_api}请求失败,错误码: {code}, 消息: {msg}") else: print(f"{xiaoniangao_log_upload_api} 请求失败,状态码: {response.status_code}") except requests.exceptions.RequestException as e: print(f"{xiaoniangao_log_upload_api}请求异常: {e}") except json.JSONDecodeError: print(f"{xiaoniangao_log_upload_api}响应不是有效的 JSON 格式") class XNGTJLRecommend(object): """ 小年糕推荐流 """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.limit_flag = False self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.mysql = MysqlHelper(mode=self.mode, platform=self) def get_video_duration(self, video_link: str) -> int: cap = cv2.VideoCapture(video_link) if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) duration = int(frame_num / rate) return duration return 0 def get_recommend_list(self): print("小年糕推荐流开始") """ 获取推荐页视频 """ headers = { 'Content-Type': 'application/json' } data_rule = FsData() title_rule = data_rule.get_title_rule() # for i in range(3): for i in range(6): url = "http://8.217.192.46:8889/crawler/xiao_nian_gao_plus/recommend" payload = json.dumps({}) response = requests.request("POST", url, headers=headers, data=payload) response = response.json() if response['code'] != 0: self.aliyun_log.logging( code="3000", message="抓取单条视频失败,请求失败" ), return video_view_lists = [] video_log_uploads = [] for index, video_obj in enumerate(response['data']['data'], 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj ) vid = video_obj['id'] video_view_lists.append(str(vid)) video_log_uploads.append(self.build_video_log(video_obj, index)) self.process_video_obj(video_obj, title_rule) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format( 1, index, e ), ) if self.limit_flag: return time.sleep(random.randint(5, 10)) video_history(video_view_lists) log_upload(video_log_uploads) def process_video_obj(self, video_obj, title_rule): """ 处理视频 :param video_obj: """ time.sleep(random.randint(3, 8)) trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) item = VideoItem() vid = video_obj['id'] mid = int(video_obj['user']['mid']) print(f"vid={vid},mid={mid}") try: mid = int(video_obj['user']['mid']) print(f"id:{mid}") user_name = video_obj['user']['nick'] avatar_url = video_obj['user']['hurl'] sql = Sql() max_id = sql.select_id(mid) if max_id: sql.update_name_url(mid, avatar_url, user_name) else: time.sleep(1) link = sql.select_id_status(mid) if link: sql.insert_name_url(mid, avatar_url, user_name) print(f"开始写入{mid}") xng_in_video_data(json.dumps({"mid": mid})) except Exception as e: print(f"写入异常{e}") pass url = video_obj["v_url"] duration = self.get_video_duration(url) item.add_video_info("video_id", video_obj["id"]) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", int(video_obj["play_pv"])) item.add_video_info("publish_time_stamp", int(int(video_obj["t"]) / 1000)) item.add_video_info("out_user_id", video_obj["id"]) item.add_video_info("cover_url", video_obj["url"]) item.add_video_info("like_cnt", 0) item.add_video_info("share_cnt", int(video_obj["share"])) item.add_video_info("comment_cnt", int(video_obj["comment_count"])) item.add_video_info("video_url", video_obj["v_url"]) item.add_video_info("out_video_id", video_obj["id"]) item.add_video_info("duration", int(duration)) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): title_list = title_rule.split(",") title = video_obj["title"] contains_keyword = any(keyword in title for keyword in title_list) if contains_keyword: new_title = GPT4oMini.get_ai_mini_title(title) if new_title: item.add_video_info("video_title", new_title) current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [ [ video_obj["v_url"], video_obj["url"], title, new_title, formatted_time, ] ] FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "ROWS", 1, 2) time.sleep(0.5) FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "A2:Z2", values) self.mq.send_msg(mq_obj) video_view(vid, mid) self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True """ 查询用户id是否存在 """ def select_id(self, uid): sql = f""" select uid from xng_uid where uid = "{uid}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return True return False """ 查询用户id是否之前已添加过 """ def select_id_status(self, uid): sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return False return True def build_video_log(self, video_obj: Dict[str, Any], index: int) -> Dict[str, Any]: """构建视频日志对象""" return { "ac": "show", "md_ver": "2.0", "data": { "page": "discoverIndexPage", "topic": "recommend", "tpl_id": str(video_obj['tpl_id']), "profile_ct": str(video_obj['p_ct']), "flv": "0", "sign": video_obj['sign'], "serial_id": video_obj['serial_id'], "type": "post", "name": "post", "src_page": "discoverIndexPage/recommend", "aid": str(video_obj['album_id']), "cid": str(video_obj['id']), "feed_idx": index - 1, "cmid": str(video_obj['user']['mid']), "user_ct": "1638346045009" }, "t": int(time.time() * 1000), "ab": {} } def run(self): self.get_recommend_list() if __name__ == '__main__': J = XNGTJLRecommend( platform="xiaonianggaotuijianliu", mode="recommend", rule_dict={}, user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}], ) J.get_recommend_list() # J.logic()