# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/6/27 import json import os import time from datetime import date import requests from main.common import Common from main.feishu_lib import Feishu class Demo: @classmethod def feishu(cls): value = Feishu.get_range_value("xiaoniangao", "monitor", "N7e2yI", "B4:B4") print(type(value)) print(value) @classmethod def today(cls): today = date.today() print(today) value = Feishu.get_range_value("xiaoniangao", "monitor", "N7e2yI", "P1:P1") print(value) # print(Feishu.update_values("xiaoniangao", "monitor", "N7e2yI", "P1:P1", [[str(today)]])) @classmethod def lists(cls): list1 = [6445, 52077, 15333] list2 = [[x] for x in list1] print(list2) @classmethod def get_sheet(cls): xiaoniangao_sheet = Feishu.get_values_batch("xiaoniangao", "monitor", "N7e2yI") video_id = xiaoniangao_sheet[1][6] user_id = xiaoniangao_sheet[1][9] user_mid = xiaoniangao_sheet[1][10] print(video_id) print(user_id) print(user_mid) @classmethod def get_session(cls): # charles 抓包文件保存目录 charles_file_dir = "../chlsfiles/" if int(len(os.listdir(charles_file_dir))) == 1: Common.logger("kanyikan").info("未找到chlsfile文件,等待60s") time.sleep(60) else: try: # 目标文件夹下所有文件 all_file = sorted(os.listdir(charles_file_dir)) # 获取到目标文件 old_file = all_file[-1] # 分离文件名与扩展名 new_file = os.path.splitext(old_file) # 重命名文件后缀 os.rename(os.path.join(charles_file_dir, old_file), os.path.join(charles_file_dir, new_file[0] + ".txt")) with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) if "search.weixin.qq.com" in [text['host'] for text in contents]: for text in contents: if text["host"] == "search.weixin.qq.com" \ and text["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt": sessions = text["query"].split("session=")[-1].split("&wxaVersion=")[0] if "&vid" in sessions: session = sessions.split("&vid")[0] return session elif "&offset" in sessions: session = sessions.split("&offset")[0] return session elif "&wxaVersion" in sessions: session = sessions.split("&wxaVersion")[0] return session elif "&limit" in sessions: session = sessions.split("&limit")[0] return session elif "&scene" in sessions: session = sessions.split("&scene")[0] return session elif "&count" in sessions: session = sessions.split("&count")[0] return session elif "&channelid" in sessions: session = sessions.split("&channelid")[0] return session elif "&subscene" in sessions: session = sessions.split("&subscene")[0] return session elif "&clientVersion" in sessions: session = sessions.split("&clientVersion")[0] return session elif "&sharesearchid" in sessions: session = sessions.split("&sharesearchid")[0] return session elif "&nettype" in sessions: session = sessions.split("&nettype")[0] return session elif "&switchprofile" in sessions: session = sessions.split("&switchprofile")[0] return session elif "&switchnewuser" in sessions: session = sessions.split("&switchnewuser")[0] return session else: return sessions else: Common.logger("kanyikan").info("未找到 session,10s后重新获取") time.sleep(10) cls.get_session() except Exception as e: Common.logger("kanyikan").exception("获取 session 异常,30s后重试:{}", e) time.sleep(30) cls.get_session() @classmethod def get_video_info(cls, video_id): url = "https://search.weixin.qq.com/cgi-bin/recwxa/recwxagetonevideoinfo?" param = { "session": cls.get_session(), "vid": video_id, "wxaVersion": "3.9.2", "channelid": "208201", "scene": "32", "subscene": "1089", "model": "iPhone 1114.7.1", "clientVersion": "8.0.18", "sharesearchid": "447665862521758270", "sharesource": "-1" } r = requests.get(url=url, params=param) video_id = r.json()["data"]['vid'] video_title = r.json()["data"]['title'] play_cnt = r.json()["data"]["played_cnt"] shared_cnt = r.json()["data"]["shared_cnt"] if "items" not in r.json()["data"]["play_info"]: video_url = r.json()["data"]["play_info"][-1]["play_url"] # video_url = r.json()["data"]["play_info"] else: video_url = r.json()["data"]["play_info"]["items"][-1]["play_url"] # video_url = r.json()["data"]["play_info"] print(f"video_id:{video_id}") print(f"video_title:{video_title}") print(f"play_cnt:{play_cnt}") print(f"shared_cnt:{shared_cnt}") print(f"video_url:{video_url}") # response = { # 'data': { # 'collection_info': { # 'collectionid': 'mmsearchrecwxacollection_496906082_1641976100173', # 'episode_count': 0, # 'img': '', # 'latest_episode': 0, # 'size': 0, # 'title': '' # }, # 'comment_cnt': 54, # 'composite_type': 0, # 'composite_video_info': { # 'first_composite_img_url': '' # }, # 'cover_url': 'http://mmbiz.qpic.cn/wx_search/duc2TvpEgSTib1eic8eJ1MrictHrfvedFwfUGYhyjlJicpnrTrygHcSuTg/0', # 'duration': 210, # 'height': 1200, # 'isUGCVideo': True, # 'is_expose_comment': False, # 'is_following': False, # 'is_forbid_comment': False, # 'is_livereserve': False, # 'item_type': 16, # 'liked': False, # 'liked_cnt': 1865, # 'openid': 'oh_m45TTUoolLfj0x0vHl0QdlSw8', # 'owner_head_image': 'http://mmbiz.qpic.cn/wx_search/duc2TvpEgSRgbVcqh8gWWmQicsib3HJ97O6zfzwiazWMyIIzkKJxluhgQ/0', # 'play_icon_cover_url': 'http://mmbiz.qpic.cn/wx_search/Q3auHgzwzM6hiaHcSicmACTMfgzFdsLu77ovZnnx8MSLHhz2vYTxiaUfcrllqxW5t0bfNArKjOJBxw/0', # 'play_info': { # 'cdn_gif_url': '', # 'cdn_source_type': 4, # 'end_play_time_in_ms': 0, # 'video_api': '', # 'watermark_type': 0 # }, # 'played_cnt': 347350, # 'shared_cnt': 35463, # 'size': 0, # 'status': 10, # 'title': '太恶心了_现在的外卖,正在毁掉我们的下一代_Merge', # 'upload_time': 1647850532, # 'user_info': { # 'headimg_url': 'http://mmbiz.qpic.cn/wx_search/duc2TvpEgSRgbVcqh8gWWmQicsib3HJ97O6zfzwiazWMyIIzkKJxluhgQ/0', # 'nickname': '代替不了', # 'openid': 'oh_m45TTUoolLfj0x0vHl0QdlSw8' # }, # 'vid': 'ugc_b42rvhr', # 'video_type': 1, # 'width': 720 # }, # 'errcode': 0, # 'msg': 'ok', # 'retcode': 0 # } @classmethod def get_dir(cls): charles_file_dir = "../chlsfiles/" print(charles_file_dir) if __name__ == "__main__": demo = Demo() # demo.feishu() demo.today() # demo.lists() # demo.get_sheet() # demo.get_video_info("ugc_iqrmf5") # demo.get_dir()