import os import json import random import sys import time import uuid import datetime import requests import cv2 sys.path.append(os.getcwd()) from datetime import datetime from common.feishu import Feishu from common import PiaoQuanPipeline, AliyunLogger from common.db import MysqlHelper from common.mq import MQ from common.public import clean_title from common.limit import AuthorLimit def find_target_user(name, user_list): """ 在搜索到到账号列表中找目标列表 """ for obj in user_list: if obj['contact']["nickname"] == name: return obj else: continue return False class ShiPinHaoAuthor(object): """ 视频号账号爬虫 """ def __init__(self, platform, mode, rule_dict, user_dict, env): self.account_name = user_dict["link"] self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_dict = user_dict self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.limiter = AuthorLimit(platform=self.platform, mode=self.mode) def get_history_id(self): """ 从数据库表中读取 id """ select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1""" name_id = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, sql=select_user_sql, env=self.env, machine="", ) if name_id: return name_id[0][0] else: return False def get_account_id(self): """ 读历史数据,如果存在 id,则直接返回 id """ history_id = self.get_history_id() if history_id: return history_id else: url = "http://61.48.133.26:30001/Find_Video_Content" payload = json.dumps({ "content": self.account_name, "type": "19" }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) info_list = response.json()['info_list'] if len(info_list) == 0: return False target_user = find_target_user(name=self.account_name, user_list=info_list) # 写入 MySql 数据库 if target_user: update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['contact']['username']}", "{self.platform}", 1 )""" MysqlHelper.update_values( log_type=self.mode, crawler=self.platform, sql=update_sql, env=self.env, machine="", ) return target_user['contact']["username"] else: return False def get_account_videos(self): if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 300) ): return account_id = self.get_account_id() if account_id: url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage" last_buffer = "" for i in range(10): if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 30) ): return headers = { 'Content-Type': 'application/json' } payload = json.dumps({ "username": account_id, "last_buffer": last_buffer }) response = requests.request("POST", url, headers=headers, data=payload) time.sleep(random.randint(1, 5)) if "objectId" not in response.text or response.status_code != 200: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message="没有更多视频了", ) return res_json = response.json() if len(res_json["UpMasterHomePage"]) == 0: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message="没有更多视频了", ) return if not res_json["UpMasterHomePage"]: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message="没有更多视频了", ) return else: last_buffer = res_json.get('last_buffer') count = 0 for obj in res_json["UpMasterHomePage"]: try: AliyunLogger.logging( code="1001", platform=self.platform, mode=self.mode, message="扫描到一条视频", env=self.env, data=obj, ) repeat_flag = self.process_video_obj(obj, count) count += 1 if not repeat_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, message=f"抓取单条视频异常:{e}\n", ) else: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, message="{}\t获取 id 失败".format(self.account_name), ) def process_video_obj(self, obj, count): objectId = obj['objectId'] objectNonceId = obj['objectNonceId'] trace_id = self.platform + "new" + str(uuid.uuid1()) url = "http://61.48.133.26:30001/GetFinderDownloadAddress" payload = json.dumps({ "objectId": objectId, "objectNonceId": objectNonceId }) headers = { 'Content-Type': 'text/plain' } response = requests.request("POST", url, headers=headers, data=payload) time.sleep(random.randint(0, 1)) video_obj = response.json() publish_time_str = obj['createtime'] datetime_obj = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S') # 将datetime对象转换为时间戳 publish_time_stamp = int(datetime_obj.timestamp()) video_url = video_obj.get('DownloadAddress') duration = int(self.video_duration(video_url)) share_cnt = int(obj['forward_count']) like_cnt = int(obj['like_count']) # 获取当前时间 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") video_dict = { "video_id": objectId, "video_title": clean_title(video_obj.get('title').split("\n")[0].split("#")[0]), "out_video_id": objectId, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "play_cnt": 0, "fav_count": int(obj['fav_count']), "comment_cnt": int(obj['comment_count']), "like_cnt": like_cnt, "share_cnt": share_cnt, "user_id": self.user_dict["uid"], "cover_url": video_obj.get('thumb_url'), "video_url": video_url, "avatar_url": video_obj.get('thumb_url'), "width": video_obj.get('width'), "height": video_obj.get('height'), "duration": duration, "platform": self.platform, "strategy": self.mode, "crawler_rule": self.rule_dict, "session": f"shipinhao-author-{int(time.time())}", } fav_count = int(obj['fav_count']) if fav_count == 0: fav_count = 1 divisor_cnt = 0 else: divisor_cnt = int(share_cnt / fav_count) if share_cnt < 500: return True video_percent = '%.2f' % (share_cnt / like_cnt) special = float(0.25) if float(video_percent) < special: return True # 视频时长小于30秒 返回 if duration < 45: values = [[ obj['nickname'], publish_time_str, formatted_time, int(obj['fav_count']), int(obj['comment_count']), int(obj['like_count']), int(obj['forward_count']), divisor_cnt, video_obj.get('title').split("\n")[0].split("#")[0], duration, '否', '时长小于30秒', video_obj.get('DownloadAddress') ]] Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values) return True pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) if not pipeline.repeat_video(): values = [[ obj['nickname'], publish_time_str, formatted_time, int(obj['fav_count']), int(obj['comment_count']), int(obj['like_count']), int(obj['forward_count']), divisor_cnt, video_obj.get('title').split("\n")[0].split("#")[0], duration, '否', '重复视频', video_obj.get('DownloadAddress') ]] Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values) if count > 3: return False else: return True else: values = [[ obj['nickname'], publish_time_str, formatted_time, int(obj['fav_count']), int(obj['comment_count']), int(obj['like_count']), int(obj['forward_count']), divisor_cnt, video_obj.get('title').split("\n")[0].split("#")[0], duration, '是', '', video_obj.get('DownloadAddress') ]] Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values) video_dict["publish_time"] = video_dict["publish_time_str"] limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id']) if limit_flag: self.mq.send_msg(video_dict) self.download_cnt += 1 AliyunLogger.logging( code="1002", platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) time.sleep(5) return True def video_duration(self, filename): cap = cv2.VideoCapture(filename) if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) duration = frame_num / rate return duration return 0 if __name__ == "__main__": SP = ShiPinHaoAuthor( platform="shipinhao", mode="author", user_dict={"uid": "123456", "link": "老赵讲育儿", "user_id": "1234565"}, rule_dict={}, env="prod", ) SP.get_account_videos()