import json import random import time import requests from common import Material, Oss, Common, Feishu from common.sql_help import sqlCollect from data_channel.data_help import dataHelp from data_channel.shipinhao import SPH class SphHistory: """获取视频号所有内容""" @classmethod def sph_data_info(cls): user_list = cls.get_sph_user() if user_list == None: return for user in user_list: Common.logger("sph_crawling").info(f"{user}开始获取数据") account_id = SPH.get_account_id(user) if account_id == False: print(f"{account_id}:没有获取到视频account_id,无法抓取数据") continue url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage" last_buffer = "" try: count = 1 while True: headers = { 'Content-Type': 'application/json' } payload = json.dumps({ "username": account_id, "last_buffer": last_buffer }) response = requests.request("POST", url, headers=headers, data=payload) time.sleep(random.randint(1, 5)) Common.logger("sph_crawling").info(f"{user}获取第{count}页视频") count += 1 if response.text == "" or response.text == None: break res_json = response.json() try: if len(res_json["DownloadAddress"]) == 0 or res_json["DownloadAddress"] == "" or res_json["DownloadAddress"] == None: break except: pass if "objectId" not in response.text or response.status_code != 200: break if len(res_json["UpMasterHomePage"]) == 0: break if not res_json["UpMasterHomePage"]: break last_buffer = res_json.get('last_buffer') try: for obj in res_json["UpMasterHomePage"]: Common.logger("sph_crawling").info(f"{user}扫描到一条数据") objectId = obj['objectId'] object_id = sqlCollect.sph_data_info_v_id(objectId, "视频号") if object_id: continue objectNonceId = obj['objectNonceId'] url1 = "http://61.48.133.26:30001/GetFinderDownloadAddress" payload = json.dumps({ "objectId": objectId, "objectNonceId": objectNonceId }) headers = { 'Content-Type': 'text/plain' } response = requests.request("POST", url1, headers=headers, data=payload) time.sleep(random.randint(0, 1)) video_obj = response.json() video_url = video_obj.get('DownloadAddress') duration = dataHelp.video_duration(video_url) cover = video_obj.get('thumb_url') if len(video_url) == 0: continue v_id = f"sph/{objectId}" try: Common.logger("sph_crawling").info(f"{user}视频ID:{objectId},视频链接:{video_url}开始发送oss") oss_video_key = Oss.channel_upload_oss(video_url, v_id) # 视频发送OSS oss_video_key = oss_video_key.get("oss_object_key") Common.logger("sph_crawling").info(f"{user}视频发送oss成功,视频oss地址{oss_video_key}") Common.logger("sph_crawling").info(f"{user}视频ID:{objectId},封面链接:{cover}开始发送oss") oss_cover_key = Oss.channel_upload_oss(cover, f"sph/{objectId}.jpg") # 视频发送OSS oss_cover_key = oss_cover_key.get("oss_object_key") Common.logger("sph_crawling").info(f"{user}封面发送oss成功,封面oss地址{oss_video_key}") create_time = obj['createtime'] # 发布时间 except: continue share_cnt = int(obj['forward_count']) # 分享 like_cnt = int(obj['like_count']) # 点赞 video_title = video_obj.get('title').split("\n")[0].split("#")[0] user_name = obj['username'] # 用户名标示 nick_name = obj['nickname'] # 用户名 comment_count = obj['comment_count'] # 评论数 fav_count = obj['fav_count'] # 大拇指点赞数 sqlCollect.sph_data_info('视频号', objectId, video_url, cover, video_title, str(share_cnt), str(like_cnt), oss_video_key, oss_cover_key, nick_name, user_name, comment_count, fav_count, create_time,duration) Common.logger("sph_crawling").info(f"{nick_name}插入数据成功") except Exception as e: Common.logger("sph_crawling").info(f"{user}异常,异常信息{e}") continue sqlCollect.update_sph_channel_user_status(user) Common.logger("sph_crawling").info(f"{user}用户抓取完成") count = sqlCollect.sph_data_info_count(user, "视频号") text = ( f"**{user}抓取完成:共抓了{count[0]}条数据**\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/029fa989-9847-4574-8e1b-5c396e665f16", "【 视频号历史数据抓取通知 】") except Exception as e: Common.logger("sph_crawling").info(f"{user}异常,异常信息{e}") Feishu.finish_bot(e, "https://open.feishu.cn/open-apis/bot/v2/hook/029fa989-9847-4574-8e1b-5c396e665f16", "【 视频号抓取异常通知 】") continue @classmethod def get_sph_user(cls): data = sqlCollect.sph_channel_user_list() if data == None: user_list = Material.get_sph_user() if user_list: for user in user_list: sqlCollect.insert_sph_channel_user("视频号", user) else: return None result_list = [item for sublist in data for item in sublist] return result_list if __name__ == '__main__': SphHistory.sph_data_info() # count = sqlCollect.sph_data_info_count("郑蓝旗", "视频号") # print(count)