import random import time import requests import json from common import Common, Feishu from common.sql_help import sqlCollect class KSLS: @classmethod def get_ksls_list(cls, task_mark, url_id, number, mark): url = "http://8.217.190.241:8888/crawler/kuai_shou/blogger" next_cursor = "" for i in range(50): payload = json.dumps({ "account_id": url_id, "sort_type": "最热", "cursor": next_cursor }) headers = { 'Content-Type': 'application/json' } time.sleep(random.randint(1, 5)) response = requests.request("POST", url, headers=headers, data=payload) response = response.json() list = [] data_all_list = response["data"] if data_all_list == None or len(data_all_list) == 0: try: if int(response["cdoe"]) == 27006: Feishu.finish_bot(response["msg"], "https://open.feishu.cn/open-apis/bot/v2/hook/575ca6a1-84b4-4a2f-983b-1d178e7b16eb", "【 Token 使用提示 】") except Exception as exc: return list has_more = data_all_list["has_more"] next_cursor = str(data_all_list["next_cursor"]) try: data_list = data_all_list["data"] for data in data_list: photo_id = data["photo_id"] status = sqlCollect.is_used(task_mark, photo_id, mark, "快手") if status == False: continue status = sqlCollect.is_used(task_mark, photo_id, mark, "快手历史") if status == False: continue view_count = data["view_count"] share_count = data["share_count"] old_title = data["caption"] # 标题 video_percent = '%.4f' % (int(share_count) / (view_count)) duration = data["duration"] duration = int(duration)/1000 special = float(0.0005) if float(video_percent) < special or int(share_count) < 100 or int(duration) < 30 or (duration) > 720: Common.logger("ks-ls").info( f"不符合规则:{task_mark},用户主页id:{url_id},视频id{photo_id} ,分享:{share_count},浏览{view_count} ,时长:{int(duration)} ") continue video_url, image_url = cls.get_video(photo_id) if video_url: all_data = {"video_id": photo_id, "cover": image_url, "video_url": video_url, "rule": video_percent, "old_title": old_title} list.append(all_data) if len(list) == int(number): Common.logger("ks-ls").info(f"获取快手历史视频总数:{len(list)}\n") return list if has_more == False: return list except Exception as exc: Common.logger("ks-ls").info(f"快手历史数据获取失败:{exc}\n") return list @classmethod def get_video(cls, video_id): url = "http://8.217.190.241:8888/crawler/kuai_shou/detail" payload = json.dumps({ "content_id": str(video_id) }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() data = response["data"]["data"] video_url = data["video_url_list"][0]["video_url"] image_url = data["image_url_list"][0]["image_url"] return video_url, image_url if __name__ == '__main__': Feishu.finish_bot('测试', "https://open.feishu.cn/open-apis/bot/v2/hook/575ca6a1-84b4-4a2f-983b-1d178e7b16eb", "【 Token 使用提示 】") # DYLS.get_video("7314923922602954022") KSLS.get_ksls_list("1","3xzicxg2nandemc",1,"1")