import json import random import time import requests from common import Common from common.sql_help import sqlCollect from data_channel.data_help import dataHelp class SPH: @classmethod def find_target_user(cls, name, user_list): """ 在搜索到到账号列表中找目标列表 """ for obj in user_list: if obj['contact']["nickname"] == name: return obj else: continue return False @classmethod def get_account_id(cls, account_name): channel = 'shipinhao' history_id = sqlCollect.get_history_id(channel, account_name) if history_id: return history_id else: url = "http://61.48.133.26:30001/Find_Video_Content" payload = json.dumps({ "content": account_name, "type": "19" }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) info_list = response.json()['info_list'] if len(info_list) == 0: return False target_user = cls.find_target_user(name=account_name, user_list=info_list) # 写入 MySql 数据库 if target_user: target = target_user['contact']['username'] sqlCollect.insert_history_id(account_name, target, channel) return target_user['contact']["username"] else: return False @classmethod def get_sph_url(cls, task_mark, url, number, mark): account_id = cls.get_account_id(url) if account_id: url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage" last_buffer = "" list = [] for i in range(5): headers = { 'Content-Type': 'application/json' } payload = json.dumps({ "username": account_id, "last_buffer": last_buffer }) response = requests.request("POST", url, headers=headers, data=payload) time.sleep(random.randint(1, 5)) if "objectId" not in response.text or response.status_code != 200: continue res_json = response.json() if len(res_json["UpMasterHomePage"]) == 0: continue if not res_json["UpMasterHomePage"]: continue else: last_buffer = res_json.get('last_buffer') for obj in res_json["UpMasterHomePage"]: objectId = obj['objectId'] status = sqlCollect.is_used(task_mark, objectId, mark, "视频号") if status: objectNonceId = obj['objectNonceId'] url = "http://61.48.133.26:30001/GetFinderDownloadAddress" payload = json.dumps({ "objectId": objectId, "objectNonceId": objectNonceId }) headers = { 'Content-Type': 'text/plain' } response = requests.request("POST", url, headers=headers, data=payload) time.sleep(random.randint(0, 1)) video_obj = response.json() video_url = video_obj.get('DownloadAddress') share_cnt = int(obj['forward_count']) # 分享 like_cnt = int(obj['like_count']) # 点赞 old_title = video_obj.get('title').split("\n")[0].split("#")[0] if share_cnt < 500: Common.logger("sph").info( f"任务:{task_mark},用户主页id:{url},视频id{objectId} ,分享:{share_cnt},点赞:{like_cnt}") continue video_percent = '%.2f' % (share_cnt / like_cnt) special = float(0.25) if float(video_percent) < special: Common.logger("sph").info( f"任务:{task_mark},用户主页id:{url},视频id{objectId} ,分享:{share_cnt},点赞:{like_cnt}") continue duration = dataHelp.video_duration(video_url) if int(duration) >= 45: cover = video_obj.get('thumb_url') all_data = {"video_id": objectId, "cover": cover, "video_url": video_url, "rule": video_percent, "old_title": old_title} list.append(all_data) if len(list) == int(number): Common.logger("log").info(f"获取视频号视频总数:{len(list)}\n") return list else: Common.logger("sph").info( f"任务:{task_mark},用户主页id:{url},视频id{objectId} ,分享:{share_cnt},点赞:{like_cnt} ,时长:{duration} ") return list return [] if __name__ == '__main__': SPH.get_sph_url('1',"霖霖觅影",'10','2')