123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import json
- import random
- import time
- import requests
- from common import Common
- from common.sql_help import sqlCollect
- from data_channel.data_help import dataHelp
- class SPH:
- @classmethod
- def find_target_user(cls, name, user_list):
- """
- 在搜索到到账号列表中找目标列表
- """
- for obj in user_list:
- if obj['contact']["nickname"] == name:
- return obj
- else:
- continue
- return False
- @classmethod
- def get_account_id(cls, account_name):
- channel = 'shipinhao'
- history_id = sqlCollect.get_history_id(channel, account_name)
- if history_id:
- return history_id
- else:
- url = "http://61.48.133.26:30001/Find_Video_Content"
- payload = json.dumps({
- "content": account_name,
- "type": "19"
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- info_list = response.json()['info_list']
- if len(info_list) == 0:
- return False
- target_user = cls.find_target_user(name=account_name, user_list=info_list)
- # 写入 MySql 数据库
- if target_user:
- target = target_user['contact']['username']
- sqlCollect.insert_history_id(account_name, target, channel)
- return target_user['contact']["username"]
- else:
- return False
- @classmethod
- def get_sph_url(cls, task_mark, url, number, mark):
- account_id = cls.get_account_id(url)
- if account_id:
- url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage"
- last_buffer = ""
- list = []
- for i in range(5):
- headers = {
- 'Content-Type': 'application/json'
- }
- payload = json.dumps({
- "username": account_id,
- "last_buffer": last_buffer
- })
- response = requests.request("POST", url, headers=headers, data=payload)
- time.sleep(random.randint(1, 5))
- if "objectId" not in response.text or response.status_code != 200:
- continue
- res_json = response.json()
- if len(res_json["UpMasterHomePage"]) == 0:
- continue
- if not res_json["UpMasterHomePage"]:
- continue
- else:
- last_buffer = res_json.get('last_buffer')
- for obj in res_json["UpMasterHomePage"]:
- objectId = obj['objectId']
- status = sqlCollect.is_used(task_mark, objectId, mark, "视频号")
- if status:
- objectNonceId = obj['objectNonceId']
- url = "http://61.48.133.26:30001/GetFinderDownloadAddress"
- payload = json.dumps({
- "objectId": objectId,
- "objectNonceId": objectNonceId
- })
- headers = {
- 'Content-Type': 'text/plain'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- time.sleep(random.randint(0, 1))
- video_obj = response.json()
- video_url = video_obj.get('DownloadAddress')
- share_cnt = int(obj['forward_count']) # 分享
- like_cnt = int(obj['like_count']) # 点赞
- old_title = video_obj.get('title').split("\n")[0].split("#")[0]
- if share_cnt < 500:
- Common.logger("sph").info(
- f"任务:{task_mark},用户主页id:{url},视频id{objectId} ,分享:{share_cnt},点赞:{like_cnt}")
- continue
- video_percent = '%.2f' % (share_cnt / like_cnt)
- special = float(0.25)
- if float(video_percent) < special:
- Common.logger("sph").info(
- f"任务:{task_mark},用户主页id:{url},视频id{objectId} ,分享:{share_cnt},点赞:{like_cnt}")
- continue
- duration = dataHelp.video_duration(video_url)
- if int(duration) >= 45:
- cover = video_obj.get('thumb_url')
- all_data = {"video_id": objectId, "cover": cover, "video_url": video_url, "rule": video_percent, "old_title": old_title}
- list.append(all_data)
- if len(list) == int(number):
- Common.logger("log").info(f"获取视频号视频总数:{len(list)}\n")
- return list
- else:
- Common.logger("sph").info(
- f"任务:{task_mark},用户主页id:{url},视频id{objectId} ,分享:{share_cnt},点赞:{like_cnt} ,时长:{duration} ")
- return list
- return []
- if __name__ == '__main__':
- SPH.get_sph_url('1',"霖霖觅影",'10','2')
|