import random import time import requests import json from common import Common, Feishu, AliyunLogger from common.sql_help import sqlCollect class DYLS: @classmethod def get_dy_zr_list(cls, task_mark, url_id, number, mark, channel_id, name): url = "http://47.236.68.175:8889/crawler/dou_yin/blogger" list = [] next_cursor = '' for i in range(20): try: payload = json.dumps({ "account_id": url_id, "source": "app", "sort": "最热", "cursor": next_cursor }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) time.sleep(random.randint(1, 5)) response = response.json() code = response['code'] if code != 0: Common.logger("dy-ls").info(f"抖音历史数据获取失败,接口为/dou_yin/blogge\n") return list data_list = response['data'] next_cursor = str(data_list['next_cursor']) data = data_list['data'] for i in range(len(data)): video_id = data[i].get('aweme_id') # 文章id # status = sqlCollect.is_used(task_mark, video_id, mark, "抖音") # if status: status = sqlCollect.is_used(task_mark, video_id, mark, "抖音历史") video_uri = data[i].get('video', {}).get('play_addr', {}).get('uri') ratio = f'{data[i].get("video", {}).get("height")}p' video_url = f'https://www.iesdouyin.com/aweme/v1/play/?video_id={video_uri}&ratio={ratio}&line=0' # 视频链接 digg_count = int(data[i].get('statistics').get('digg_count')) # 点赞 share_count = int(data[i].get('statistics').get('share_count')) # 转发 duration = data[i].get('duration') duration = duration / 1000 old_title = data[i].get('desc', "").strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") log_data = f"user:{url_id},,video_id:{video_id},,video_url:{video_url},,original_title:{old_title},,share_count:{share_count},,digg_count:{digg_count},,duration:{duration}" AliyunLogger.logging(channel_id, name, url_id, video_id, "扫描到一条视频", "2001", log_data) Common.logger("dy-ls").info( f"扫描:{task_mark},用户主页id:{url_id},视频id{video_id} ,分享:{share_count},点赞{digg_count}") if status: AliyunLogger.logging(channel_id, name, url_id, video_id, "该视频已改造过", "2002", log_data) continue video_percent = '%.2f' % (int(share_count) / int(digg_count)) special = float(0.25) if int(share_count) < 500: AliyunLogger.logging(channel_id, name, url_id, video_id, "不符合规则:分享小于500", "2003", log_data) Common.logger("dy-ls").info( f"不符合规则:{task_mark},用户主页id:{url_id},视频id{video_id} ,分享:{share_count},点赞{digg_count} ,时长:{int(duration)} ") continue if float(video_percent) < special: AliyunLogger.logging(channel_id, name, url_id, video_id, "不符合规则:分享/点赞小于0.25", "2003", log_data) Common.logger("dy-ls").info( f"不符合规则:{task_mark},用户主页id:{url_id},视频id{video_id} ,分享:{share_count},点赞{digg_count} ,时长:{int(duration)} ") continue if int(duration) < 30 or int(duration) > 720: AliyunLogger.logging(channel_id, name, url_id, video_id, "不符合规则:时长不符合规则大于720秒/小于30秒", "2003", log_data) Common.logger("dy-ls").info( f"不符合规则:{task_mark},用户主页id:{url_id},视频id{video_id} ,分享:{share_count},点赞{digg_count} ,时长:{int(duration)} ") continue cover_url = data[i].get('video').get('cover').get('url_list')[0] # 视频封面 all_data = {"video_id": video_id, "cover": cover_url, "video_url": video_url, "rule": video_percent, "old_title": old_title} list.append(all_data) AliyunLogger.logging(channel_id, name, url_id, video_id, "符合规则等待改造", "2004", log_data) if len(list) == int(number): Common.logger("dy-ls").info(f"获取抖音历史视频总数:{len(list)}\n") return list if next_cursor == False: return list except Exception as exc: Common.logger("dy-ls").info(f"抖音历史数据获取失败:{exc}\n") return list return list return list @classmethod def get_dyls_list(cls, task_mark, url_id, number, mark): next_cursor = "" for i in range(10): list = [] try: # 抖查查 url = "http://47.236.68.175:8889/crawler/dou_yin/blogger" payload = json.dumps({ "account_id": url_id, "source": "抖查查", "cursor": next_cursor }) headers = { 'Content-Type': 'application/json' } time.sleep(random.randint(1, 5)) response = requests.request("POST", url, headers=headers, data=payload) response = response.json() data_all_list = response["data"] has_more = data_all_list["has_more"] next_cursor = str(data_all_list["next_cursor"]) data_list = data_all_list["data"] for data in data_list: # comment_count = data["comment_count"] # download_count = data["download_count"] share_count = data["share_count"] good_count = data["good_count"] # collect_count = data["collect_count"] duration = data["duration"] video_id = data["video_id"] old_title = data["video_desc"] status = sqlCollect.is_used(video_id, mark, "抖音") if status: status = sqlCollect.is_used(video_id, mark, "抖音历史") if status == False: continue video_percent = '%.2f' % (int(share_count) / int(good_count)) special = float(0.25) duration = duration / 1000 if int(share_count) < 500 or float(video_percent) < special or int(duration) < 30 or int(duration) > 720: Common.logger("dy-ls").info( f"不符合规则:{task_mark},用户主页id:{url_id},视频id{video_id} ,分享:{share_count},点赞{good_count} ,时长:{int(duration)} ") continue video_url, image_url = cls.get_video(video_id) if video_url: all_data = {"video_id": video_id, "cover": image_url, "video_url": video_url, "rule": video_percent, "old_title": old_title} list.append(all_data) if len(list) == int(number): Common.logger("dy-ls").info(f"获取抖音历史视频总数:{len(list)}\n") return list else: Common.logger("dy-ls").info(f"抖音历史获取url失败") Feishu.finish_bot("dou_yin/detail接口无法获取到视频链接", "https://open.feishu.cn/open-apis/bot/v2/hook/575ca6a1-84b4-4a2f-983b-1d178e7b16eb", "【抖音异常提示 】") if has_more == False: return list except Exception as exc: Common.logger("dy-ls").info(f"抖音历史数据获取失败:{exc}\n") return list @classmethod def get_video(cls, video_id): url = "http://47.236.68.175:8889/crawler/dou_yin/detail" for i in range(3): payload = json.dumps({ "content_id": str(video_id) }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() code = response["code"] if code == 10000: time.sleep(60) data = response["data"]["data"] video_url = data["video_url_list"][0]["video_url"] image_url = data["image_url_list"][0]["image_url"] return video_url, image_url return None, None if __name__ == '__main__': DYLS.get_dy_zr_list(1,2,1,3) # DYLS.get_dyls_list("1","MS4wLjABAAAA2QEvnEb7cQDAg6vZXq3j8_LlbO_DiturnV7VeybFKY4",1,"1")