# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/8/3 import datetime import json import os import time # import urllib.parse # from urllib import parse import requests import urllib3 from main.feishu_lib import Feishu class Demo: # 获取 token,保存至飞书云文档 @classmethod def get_token(cls): # charles 抓包文件保存目录 charles_file_dir = "../chlsfiles/" if int(len(os.listdir(charles_file_dir))) == 0: print("未找到chlsfile文件,等待60s") time.sleep(60) else: # try: # 目标文件夹下所有文件 all_file = sorted(os.listdir(charles_file_dir)) # 获取到目标文件 old_file = all_file[-1] # 分离文件名与扩展名 new_file = os.path.splitext(old_file) # 重命名文件后缀 os.rename(os.path.join(charles_file_dir, old_file), os.path.join(charles_file_dir, new_file[0] + ".txt")) with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) for content in contents: if "mp.weixin.qq.com" in content['host']: if content["path"] == r"/mp/getappmsgext": # query query = content["query"] Feishu.update_values("recommend", "gzh", "VzrN7E", "B9:B9", [[query]]) # body headers = content["request"]["header"]["headers"] body = content["request"]["body"]["text"] # time.sleep(1) Feishu.update_values("recommend", "gzh", "VzrN7E", "B8:B8", [[body]]) # title / vid title = content["request"]["body"]["text"].split("title=")[-1].split("&ct=")[0] vid = content["request"]["body"]["text"].split("vid=")[-1].split("&is_pay_subscribe")[0] # time.sleep(1) Feishu.update_values("recommend", "gzh", "VzrN7E", "B1:B1", [[title]]) # time.sleep(1) Feishu.update_values("recommend", "gzh", "VzrN7E", "B2:B2", [[vid]]) for h in headers: if h["name"] == "cookie" and "pass_ticket" in h["value"]: pass_ticket = h["value"].split("pass_ticket=")[-1] # print(f"pass_ticket:{pass_ticket}") Feishu.update_values("recommend", "gzh", "VzrN7E", "B5:B5", [[pass_ticket]]) if h["name"] == "referer": referer = h["value"] # print(f"__biz:{referer}") Feishu.update_values("recommend", "gzh", "VzrN7E", "B7:B7", [[referer]]) if h["name"] == "referer": __biz = h["value"].split("__biz=")[-1].split("&mid=")[0] # print(f"__biz:{__biz}") Feishu.update_values("recommend", "gzh", "VzrN7E", "B3:B3", [[__biz]]) if h["name"] == "cookie" and "appmsg_token" in h["value"]: appmsg_token = h["value"].split("appmsg_token=")[-1] # print(f"appmsg_token:{appmsg_token}") Feishu.update_values("recommend", "gzh", "VzrN7E", "B4:B4", [[appmsg_token]]) if h["name"] == "cookie" and "wap_sid2" in h["value"]: wap_sid2 = h["value"].split("wap_sid2=")[-1] # print(f"wap_sid2:{wap_sid2}") Feishu.update_values("recommend", "gzh", "VzrN7E", "B6:B6", [[wap_sid2]]) # 获取视频下载链接 @classmethod def get_url(cls, url): try: payload = {} headers = { 'Cookie': 'rewardsn=; wxtokenkey=777' } urllib3.disable_warnings() response = requests.get(url=url, headers=headers, data=payload, verify=False) response_list = response.text.splitlines() video_url_list = [] for m in response_list: if "mpvideo.qpic.cn" in m: video_url = m.split("url: '")[1].split("',")[0].replace(r"\x26amp;", "&") video_url_list.append(video_url) video_url = video_url_list[0] return video_url except Exception as e: print(f"get_url异常:{e}") @classmethod def recommend(cls): token_sheet = Feishu.get_values_batch("recommend", "gzh", "VzrN7E") title = token_sheet[0][1] vid = token_sheet[1][1] __biz = token_sheet[2][1] appmsg_token = token_sheet[3][1] pass_ticket = token_sheet[4][1] wap_sid2 = token_sheet[5][1] referer = token_sheet[6][1] body = token_sheet[7][1] query = token_sheet[8][1] # title = "%25E8%25BF%2599%25E4%25B8%25AA%25E5%25BC%259F%25E6%2591%258A%25E4%25B8%258A%25E8%25BF%2599%25E4%25B8%25AA%25E5%2593%25A5%25E4%25B8%258D%25E7%259F%25A5%25E9%2581%2593%25E6%2598%25AF%25E5%25B9%25B8%25E8%25BF%2590%25E8%25BF%2598%25E6%2598%25AF%25E5%2599%25A9%25E6%25A2%25A6%25E7%259A%2584%25E5%25BC%2580%25E5%25A7%258B" # vid = "wxv_2512850656278560768" # __biz = "MzkwMjM4OTYyMA%3D%3D" # appmsg_token = "1177_uEP3JZFhlaEpfuX2xZkF7a5mr7y0GIw2Xtm6WmawNEjFdmTFLWIMx6XbC5z-mFgwQ6mWBq8Wh9EvcaVI" # pass_ticket = "1VEraA4AwklrT95KI+7WK09zHoybkPgSDgYX/Fw3ArRKYh+QnkI1NXGFHvZ0naeD" # wap_sid2 = "CNSn5r4HEooBeV9ISXFxZ1FWX1JPazhCVXlfWW5UU0dSbXpza0lHZnlKSDVUVF9aUndjeDhvbDctc0tfUGlWd09uOTVwcTVxWGVkenB5Nld2YkRKNFBKVk9jZVpJZTZGU2hXUkpnZlB5OTMwSEJXQVpTNS13NnJXY0hiTjJibkJ6U3Z3WllaRkNmcEwzWVNBQUF+MOzurZcGOA1AAQ==" # print(f"title:{title}") # print(f"vid:{vid}") # print(f"__biz:{__biz}") # print(f"appmsg_token:{appmsg_token}") # print(f"pass_ticket:{pass_ticket}") # print(f"wap_sid2:{wap_sid2}\n") url = "https://mp.weixin.qq.com/mp/getappmsgext?" headers = { # "content-type": "application/x-www-form-urlencoded; charset=UTF-8", "content-type": 'text/plain', "accept": "*/*", "x-requested-with": "XMLHttpRequest", "accept-language": "zh-cn", "accept-encoding": "gzip, deflate, br", "origin": "https://mp.weixin.qq.com", "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15 " "(KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.26(0x18001a29)" " NetType/WIFI Language/zh_CN", "referer": referer, } # query_string = { # "f": "json", # "mock": "", # "uin": "777", # "key": "777", # "pass_ticket": pass_ticket, # "wxtoken": "", # "devicetype": "iOS14.7.1", # "clientversion": "18001a29", # "__biz": __biz, # "appmsg_token": appmsg_token, # "x5": "0", # # "f": "json" # } cookies = { "appmsg_token": appmsg_token, "devicetype": "iOS14.7.1", "lang": "zh_CN", "pass_ticket": pass_ticket, "rewardsn": "", "version": "18001a29", "wap_sid2": wap_sid2, "wxtokenkey": "777", "wxuin": "2010747860" } # form = { # "r": "0.7399955678811934", # "__biz": __biz, # "appmsg_type": "9", # "mid": "2247483669", # "sn": "b290af5fe2d28a02bf93721a10ce5106", # "idx": "1", # "scene": "136", # "title": title, # "ct": "1659387788", # "abtest_cookie": "AAACAA==", # "devicetype": "iOS14.7.1", # "version": "18001a29", # "is_need_ticket": "0", # "is_need_ad": "1", # "comment_id": "0", # "is_need_reward": "0", # "both_ad": "0", # "reward_uin_count": "0", # "send_time": "", # "msg_daily_idx": "1", # "is_original": "0", # "is_only_read": "1", # "req_id": "0415gcfdToRrDu1wXSkK8VcK", # "pass_ticket": pass_ticket, # "is_temp_url": "0", # "item_show_type": "5", # "tmp_version": "1", # "more_read_type": "0", # "appmsg_like_type": "2", # "related_video_sn": "", # "related_video_num": "5", # "vid": vid, # "is_pay_subscribe": "0", # "pay_subscribe_uin_count": "0", # "has_red_packet_cover": "0", # "album_id": "1296223588617486300", # "album_video_num": "5", # "cur_album_id": "", # "is_public_related_video": "0", # "encode_info_by_base64": "0", # "exptype": "" # } response = requests.post(url=url, headers=headers, cookies=cookies, params=query, data=body) if "related_tag_video" not in response.json(): print(f"response:{response.text}\n") elif len(response.json()["related_tag_video"]) == 0: print(f"response:{response.text}\n") else: feeds = response.json()["related_tag_video"] for m in range(len(feeds)): # video_title if "title" not in feeds[m]: video_title = 0 else: video_title = feeds[m]["title"] # video_title = base64.b64decode(video_title).decode("utf-8") # video_id if "vid" not in feeds[m]: video_id = 0 else: video_id = feeds[m]["vid"] # play_cnt if "read_num" not in feeds[m]: play_cnt = 0 else: play_cnt = feeds[m]["read_num"] # like_cnt if "like_num" not in feeds[m]: like_cnt = 0 else: like_cnt = feeds[m]["like_num"] # duration if "duration" not in feeds[m]: duration = 0 else: duration = feeds[m]["duration"] # video_width / video_height if "videoWidth" not in feeds[m] or "videoHeight" not in feeds[m]: video_width = 0 video_height = 0 else: video_width = feeds[m]["videoWidth"] video_height = feeds[m]["videoHeight"] # send_time if "pubTime" not in feeds[m]: send_time = 0 else: send_time = feeds[m]["pubTime"] # user_name if "srcDisplayName" not in feeds[m]: user_name = 0 else: user_name = feeds[m]["srcDisplayName"] # user_name = base64.b64decode(user_name).decode("utf-8") # user_id if "srcUserName" not in feeds[m]: user_id = 0 else: user_id = feeds[m]["srcUserName"] # head_url if "head_img_url" not in feeds[m]: head_url = 0 else: head_url = feeds[m]["head_img_url"] # cover_url if "cover" not in feeds[m]: cover_url = 0 else: cover_url = feeds[m]["cover"] # video_url if "url" not in feeds[m]: video_url = 0 else: video_url = feeds[m]["url"] # 下载链接 download_url = cls.get_url(video_url) print(f"video_title:{video_title}") print(f"download_url:{download_url}") # print(f"video_id:{video_id}") # print(f"play_cnt:{play_cnt}") # print(f"like_cnt:{like_cnt}") # print(f"duration:{duration}") # print(f"video_width:{video_width}") # print(f"video_height:{video_height}") # print(f"send_time:{send_time}") # print(f"user_name:{user_name}") # print(f"user_id:{user_id}") # print(f"head_url:{head_url}") # print(f"cover_url:{cover_url}") # print(f"video_url:{video_url}") print("\n") @classmethod def get_sheet(cls): sheet = Feishu.get_values_batch("recommend", "gzh", "VzrN7E") print(sheet) print(sheet[0][1]) print(sheet[1][1]) print(sheet[2][1]) print(sheet[3][1]) print(sheet[4][1]) print(sheet[5][1]) @classmethod def demo1(cls): t = 3600 * (24 - datetime.datetime.now().hour) print(24 - datetime.datetime.now().hour) print(t) print(3600*(24-datetime.datetime.now().hour)) @classmethod def demo2(cls): for i in range(3): if '我们' in '我们今天吃什么?': print('no') print('休眠 2s\n') time.sleep(2) else: print('yes') print('休眠 2s\n') time.sleep(2) if __name__ == "__main__": # Demo.get_token() # Demo.recommend() # Demo.get_sheet() Demo.demo2()