|
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2022/8/3
- import datetime
- import json
- import os
- import time
- # import urllib.parse
- # from urllib import parse
- import requests
- import urllib3
- from main.feishu_lib import Feishu
- class Demo:
- # 获取 token,保存至飞书云文档
- @classmethod
- def get_token(cls):
- # charles 抓包文件保存目录
- charles_file_dir = "../chlsfiles/"
- if int(len(os.listdir(charles_file_dir))) == 0:
- print("未找到chlsfile文件,等待60s")
- time.sleep(60)
- else:
- # try:
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(charles_file_dir))
- # 获取到目标文件
- old_file = all_file[-1]
- # 分离文件名与扩展名
- new_file = os.path.splitext(old_file)
- # 重命名文件后缀
- os.rename(os.path.join(charles_file_dir, old_file),
- os.path.join(charles_file_dir, new_file[0] + ".txt"))
- with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f:
- contents = json.load(f, strict=False)
- for content in contents:
- if "mp.weixin.qq.com" in content['host']:
- if content["path"] == r"/mp/getappmsgext":
- # query
- query = content["query"]
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B9:B9", [[query]])
- # body
- headers = content["request"]["header"]["headers"]
- body = content["request"]["body"]["text"]
- # time.sleep(1)
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B8:B8", [[body]])
- # title / vid
- title = content["request"]["body"]["text"].split("title=")[-1].split("&ct=")[0]
- vid = content["request"]["body"]["text"].split("vid=")[-1].split("&is_pay_subscribe")[0]
- # time.sleep(1)
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B1:B1", [[title]])
- # time.sleep(1)
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B2:B2", [[vid]])
- for h in headers:
- if h["name"] == "cookie" and "pass_ticket" in h["value"]:
- pass_ticket = h["value"].split("pass_ticket=")[-1]
- # print(f"pass_ticket:{pass_ticket}")
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B5:B5", [[pass_ticket]])
- if h["name"] == "referer":
- referer = h["value"]
- # print(f"__biz:{referer}")
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B7:B7", [[referer]])
- if h["name"] == "referer":
- __biz = h["value"].split("__biz=")[-1].split("&mid=")[0]
- # print(f"__biz:{__biz}")
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B3:B3", [[__biz]])
- if h["name"] == "cookie" and "appmsg_token" in h["value"]:
- appmsg_token = h["value"].split("appmsg_token=")[-1]
- # print(f"appmsg_token:{appmsg_token}")
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B4:B4", [[appmsg_token]])
- if h["name"] == "cookie" and "wap_sid2" in h["value"]:
- wap_sid2 = h["value"].split("wap_sid2=")[-1]
- # print(f"wap_sid2:{wap_sid2}")
- Feishu.update_values("recommend", "gzh", "VzrN7E", "B6:B6", [[wap_sid2]])
- # 获取视频下载链接
- @classmethod
- def get_url(cls, url):
- try:
- payload = {}
- headers = {
- 'Cookie': 'rewardsn=; wxtokenkey=777'
- }
- urllib3.disable_warnings()
- response = requests.get(url=url, headers=headers, data=payload, verify=False)
- response_list = response.text.splitlines()
- video_url_list = []
- for m in response_list:
- if "mpvideo.qpic.cn" in m:
- video_url = m.split("url: '")[1].split("',")[0].replace(r"\x26amp;", "&")
- video_url_list.append(video_url)
- video_url = video_url_list[0]
- return video_url
- except Exception as e:
- print(f"get_url异常:{e}")
- @classmethod
- def recommend(cls):
- token_sheet = Feishu.get_values_batch("recommend", "gzh", "VzrN7E")
- title = token_sheet[0][1]
- vid = token_sheet[1][1]
- __biz = token_sheet[2][1]
- appmsg_token = token_sheet[3][1]
- pass_ticket = token_sheet[4][1]
- wap_sid2 = token_sheet[5][1]
- referer = token_sheet[6][1]
- body = token_sheet[7][1]
- query = token_sheet[8][1]
- # title = "%25E8%25BF%2599%25E4%25B8%25AA%25E5%25BC%259F%25E6%2591%258A%25E4%25B8%258A%25E8%25BF%2599%25E4%25B8%25AA%25E5%2593%25A5%25E4%25B8%258D%25E7%259F%25A5%25E9%2581%2593%25E6%2598%25AF%25E5%25B9%25B8%25E8%25BF%2590%25E8%25BF%2598%25E6%2598%25AF%25E5%2599%25A9%25E6%25A2%25A6%25E7%259A%2584%25E5%25BC%2580%25E5%25A7%258B"
- # vid = "wxv_2512850656278560768"
- # __biz = "MzkwMjM4OTYyMA%3D%3D"
- # appmsg_token = "1177_uEP3JZFhlaEpfuX2xZkF7a5mr7y0GIw2Xtm6WmawNEjFdmTFLWIMx6XbC5z-mFgwQ6mWBq8Wh9EvcaVI"
- # pass_ticket = "1VEraA4AwklrT95KI+7WK09zHoybkPgSDgYX/Fw3ArRKYh+QnkI1NXGFHvZ0naeD"
- # wap_sid2 = "CNSn5r4HEooBeV9ISXFxZ1FWX1JPazhCVXlfWW5UU0dSbXpza0lHZnlKSDVUVF9aUndjeDhvbDctc0tfUGlWd09uOTVwcTVxWGVkenB5Nld2YkRKNFBKVk9jZVpJZTZGU2hXUkpnZlB5OTMwSEJXQVpTNS13NnJXY0hiTjJibkJ6U3Z3WllaRkNmcEwzWVNBQUF+MOzurZcGOA1AAQ=="
- # print(f"title:{title}")
- # print(f"vid:{vid}")
- # print(f"__biz:{__biz}")
- # print(f"appmsg_token:{appmsg_token}")
- # print(f"pass_ticket:{pass_ticket}")
- # print(f"wap_sid2:{wap_sid2}\n")
- url = "https://mp.weixin.qq.com/mp/getappmsgext?"
- headers = {
- # "content-type": "application/x-www-form-urlencoded; charset=UTF-8",
- "content-type": 'text/plain',
- "accept": "*/*",
- "x-requested-with": "XMLHttpRequest",
- "accept-language": "zh-cn",
- "accept-encoding": "gzip, deflate, br",
- "origin": "https://mp.weixin.qq.com",
- "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15 "
- "(KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.26(0x18001a29)"
- " NetType/WIFI Language/zh_CN",
- "referer": referer,
- }
- # query_string = {
- # "f": "json",
- # "mock": "",
- # "uin": "777",
- # "key": "777",
- # "pass_ticket": pass_ticket,
- # "wxtoken": "",
- # "devicetype": "iOS14.7.1",
- # "clientversion": "18001a29",
- # "__biz": __biz,
- # "appmsg_token": appmsg_token,
- # "x5": "0",
- # # "f": "json"
- # }
- cookies = {
- "appmsg_token": appmsg_token,
- "devicetype": "iOS14.7.1",
- "lang": "zh_CN",
- "pass_ticket": pass_ticket,
- "rewardsn": "",
- "version": "18001a29",
- "wap_sid2": wap_sid2,
- "wxtokenkey": "777",
- "wxuin": "2010747860"
- }
- # form = {
- # "r": "0.7399955678811934",
- # "__biz": __biz,
- # "appmsg_type": "9",
- # "mid": "2247483669",
- # "sn": "b290af5fe2d28a02bf93721a10ce5106",
- # "idx": "1",
- # "scene": "136",
- # "title": title,
- # "ct": "1659387788",
- # "abtest_cookie": "AAACAA==",
- # "devicetype": "iOS14.7.1",
- # "version": "18001a29",
- # "is_need_ticket": "0",
- # "is_need_ad": "1",
- # "comment_id": "0",
- # "is_need_reward": "0",
- # "both_ad": "0",
- # "reward_uin_count": "0",
- # "send_time": "",
- # "msg_daily_idx": "1",
- # "is_original": "0",
- # "is_only_read": "1",
- # "req_id": "0415gcfdToRrDu1wXSkK8VcK",
- # "pass_ticket": pass_ticket,
- # "is_temp_url": "0",
- # "item_show_type": "5",
- # "tmp_version": "1",
- # "more_read_type": "0",
- # "appmsg_like_type": "2",
- # "related_video_sn": "",
- # "related_video_num": "5",
- # "vid": vid,
- # "is_pay_subscribe": "0",
- # "pay_subscribe_uin_count": "0",
- # "has_red_packet_cover": "0",
- # "album_id": "1296223588617486300",
- # "album_video_num": "5",
- # "cur_album_id": "",
- # "is_public_related_video": "0",
- # "encode_info_by_base64": "0",
- # "exptype": ""
- # }
- response = requests.post(url=url, headers=headers, cookies=cookies, params=query, data=body)
- if "related_tag_video" not in response.json():
- print(f"response:{response.text}\n")
- elif len(response.json()["related_tag_video"]) == 0:
- print(f"response:{response.text}\n")
- else:
- feeds = response.json()["related_tag_video"]
- for m in range(len(feeds)):
- # video_title
- if "title" not in feeds[m]:
- video_title = 0
- else:
- video_title = feeds[m]["title"]
- # video_title = base64.b64decode(video_title).decode("utf-8")
- # video_id
- if "vid" not in feeds[m]:
- video_id = 0
- else:
- video_id = feeds[m]["vid"]
- # play_cnt
- if "read_num" not in feeds[m]:
- play_cnt = 0
- else:
- play_cnt = feeds[m]["read_num"]
- # like_cnt
- if "like_num" not in feeds[m]:
- like_cnt = 0
- else:
- like_cnt = feeds[m]["like_num"]
- # duration
- if "duration" not in feeds[m]:
- duration = 0
- else:
- duration = feeds[m]["duration"]
- # video_width / video_height
- if "videoWidth" not in feeds[m] or "videoHeight" not in feeds[m]:
- video_width = 0
- video_height = 0
- else:
- video_width = feeds[m]["videoWidth"]
- video_height = feeds[m]["videoHeight"]
- # send_time
- if "pubTime" not in feeds[m]:
- send_time = 0
- else:
- send_time = feeds[m]["pubTime"]
- # user_name
- if "srcDisplayName" not in feeds[m]:
- user_name = 0
- else:
- user_name = feeds[m]["srcDisplayName"]
- # user_name = base64.b64decode(user_name).decode("utf-8")
- # user_id
- if "srcUserName" not in feeds[m]:
- user_id = 0
- else:
- user_id = feeds[m]["srcUserName"]
- # head_url
- if "head_img_url" not in feeds[m]:
- head_url = 0
- else:
- head_url = feeds[m]["head_img_url"]
- # cover_url
- if "cover" not in feeds[m]:
- cover_url = 0
- else:
- cover_url = feeds[m]["cover"]
- # video_url
- if "url" not in feeds[m]:
- video_url = 0
- else:
- video_url = feeds[m]["url"]
- # 下载链接
- download_url = cls.get_url(video_url)
- print(f"video_title:{video_title}")
- print(f"download_url:{download_url}")
- # print(f"video_id:{video_id}")
- # print(f"play_cnt:{play_cnt}")
- # print(f"like_cnt:{like_cnt}")
- # print(f"duration:{duration}")
- # print(f"video_width:{video_width}")
- # print(f"video_height:{video_height}")
- # print(f"send_time:{send_time}")
- # print(f"user_name:{user_name}")
- # print(f"user_id:{user_id}")
- # print(f"head_url:{head_url}")
- # print(f"cover_url:{cover_url}")
- # print(f"video_url:{video_url}")
- print("\n")
- @classmethod
- def get_sheet(cls):
- sheet = Feishu.get_values_batch("recommend", "gzh", "VzrN7E")
- print(sheet)
- print(sheet[0][1])
- print(sheet[1][1])
- print(sheet[2][1])
- print(sheet[3][1])
- print(sheet[4][1])
- print(sheet[5][1])
- @classmethod
- def demo1(cls):
- t = 3600 * (24 - datetime.datetime.now().hour)
- print(24 - datetime.datetime.now().hour)
- print(t)
- print(3600*(24-datetime.datetime.now().hour))
- @classmethod
- def demo2(cls):
- for i in range(3):
- if '我们' in '我们今天吃什么?':
- print('no')
- print('休眠 2s\n')
- time.sleep(2)
- else:
- print('yes')
- print('休眠 2s\n')
- time.sleep(2)
- if __name__ == "__main__":
- # Demo.get_token()
- # Demo.recommend()
- # Demo.get_sheet()
- Demo.demo2()
|