|
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2022/6/27
- import json
- import os
- import time
- from datetime import date
- import requests
- from main.common import Common
- from main.feishu_lib import Feishu
- class Demo:
- @classmethod
- def feishu(cls):
- value = Feishu.get_range_value("xiaoniangao", "monitor", "N7e2yI", "B4:B4")
- print(type(value))
- print(value)
- @classmethod
- def today(cls):
- today = date.today()
- print(today)
- value = Feishu.get_range_value("xiaoniangao", "monitor", "N7e2yI", "P1:P1")
- print(value)
- # print(Feishu.update_values("xiaoniangao", "monitor", "N7e2yI", "P1:P1", [[str(today)]]))
- @classmethod
- def lists(cls):
- list1 = [6445, 52077, 15333]
- list2 = [[x] for x in list1]
- print(list2)
- @classmethod
- def get_sheet(cls):
- xiaoniangao_sheet = Feishu.get_values_batch("xiaoniangao", "monitor", "N7e2yI")
- video_id = xiaoniangao_sheet[1][6]
- user_id = xiaoniangao_sheet[1][9]
- user_mid = xiaoniangao_sheet[1][10]
- print(video_id)
- print(user_id)
- print(user_mid)
- @classmethod
- def get_session(cls):
- # charles 抓包文件保存目录
- charles_file_dir = "../chlsfiles/"
- if int(len(os.listdir(charles_file_dir))) == 1:
- Common.logger("kanyikan").info("未找到chlsfile文件,等待60s")
- time.sleep(60)
- else:
- try:
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(charles_file_dir))
- # 获取到目标文件
- old_file = all_file[-1]
- # 分离文件名与扩展名
- new_file = os.path.splitext(old_file)
- # 重命名文件后缀
- os.rename(os.path.join(charles_file_dir, old_file),
- os.path.join(charles_file_dir, new_file[0] + ".txt"))
- with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f:
- contents = json.load(f, strict=False)
- if "search.weixin.qq.com" in [text['host'] for text in contents]:
- for text in contents:
- if text["host"] == "search.weixin.qq.com" \
- and text["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt":
- sessions = text["query"].split("session=")[-1].split("&wxaVersion=")[0]
- if "&vid" in sessions:
- session = sessions.split("&vid")[0]
- return session
- elif "&offset" in sessions:
- session = sessions.split("&offset")[0]
- return session
- elif "&wxaVersion" in sessions:
- session = sessions.split("&wxaVersion")[0]
- return session
- elif "&limit" in sessions:
- session = sessions.split("&limit")[0]
- return session
- elif "&scene" in sessions:
- session = sessions.split("&scene")[0]
- return session
- elif "&count" in sessions:
- session = sessions.split("&count")[0]
- return session
- elif "&channelid" in sessions:
- session = sessions.split("&channelid")[0]
- return session
- elif "&subscene" in sessions:
- session = sessions.split("&subscene")[0]
- return session
- elif "&clientVersion" in sessions:
- session = sessions.split("&clientVersion")[0]
- return session
- elif "&sharesearchid" in sessions:
- session = sessions.split("&sharesearchid")[0]
- return session
- elif "&nettype" in sessions:
- session = sessions.split("&nettype")[0]
- return session
- elif "&switchprofile" in sessions:
- session = sessions.split("&switchprofile")[0]
- return session
- elif "&switchnewuser" in sessions:
- session = sessions.split("&switchnewuser")[0]
- return session
- else:
- return sessions
- else:
- Common.logger("kanyikan").info("未找到 session,10s后重新获取")
- time.sleep(10)
- cls.get_session()
- except Exception as e:
- Common.logger("kanyikan").exception("获取 session 异常,30s后重试:{}", e)
- time.sleep(30)
- cls.get_session()
- @classmethod
- def get_video_info(cls, video_id):
- url = "https://search.weixin.qq.com/cgi-bin/recwxa/recwxagetonevideoinfo?"
- param = {
- "session": cls.get_session(),
- "vid": video_id,
- "wxaVersion": "3.9.2",
- "channelid": "208201",
- "scene": "32",
- "subscene": "1089",
- "model": "iPhone 11<iPhone12,1>14.7.1",
- "clientVersion": "8.0.18",
- "sharesearchid": "447665862521758270",
- "sharesource": "-1"
- }
- r = requests.get(url=url, params=param)
- video_id = r.json()["data"]['vid']
- video_title = r.json()["data"]['title']
- play_cnt = r.json()["data"]["played_cnt"]
- shared_cnt = r.json()["data"]["shared_cnt"]
- if "items" not in r.json()["data"]["play_info"]:
- video_url = r.json()["data"]["play_info"][-1]["play_url"]
- # video_url = r.json()["data"]["play_info"]
- else:
- video_url = r.json()["data"]["play_info"]["items"][-1]["play_url"]
- # video_url = r.json()["data"]["play_info"]
- print(f"video_id:{video_id}")
- print(f"video_title:{video_title}")
- print(f"play_cnt:{play_cnt}")
- print(f"shared_cnt:{shared_cnt}")
- print(f"video_url:{video_url}")
- # response = {
- # 'data': {
- # 'collection_info': {
- # 'collectionid': 'mmsearchrecwxacollection_496906082_1641976100173',
- # 'episode_count': 0,
- # 'img': '',
- # 'latest_episode': 0,
- # 'size': 0,
- # 'title': ''
- # },
- # 'comment_cnt': 54,
- # 'composite_type': 0,
- # 'composite_video_info': {
- # 'first_composite_img_url': ''
- # },
- # 'cover_url': 'http://mmbiz.qpic.cn/wx_search/duc2TvpEgSTib1eic8eJ1MrictHrfvedFwfUGYhyjlJicpnrTrygHcSuTg/0',
- # 'duration': 210,
- # 'height': 1200,
- # 'isUGCVideo': True,
- # 'is_expose_comment': False,
- # 'is_following': False,
- # 'is_forbid_comment': False,
- # 'is_livereserve': False,
- # 'item_type': 16,
- # 'liked': False,
- # 'liked_cnt': 1865,
- # 'openid': 'oh_m45TTUoolLfj0x0vHl0QdlSw8',
- # 'owner_head_image': 'http://mmbiz.qpic.cn/wx_search/duc2TvpEgSRgbVcqh8gWWmQicsib3HJ97O6zfzwiazWMyIIzkKJxluhgQ/0',
- # 'play_icon_cover_url': 'http://mmbiz.qpic.cn/wx_search/Q3auHgzwzM6hiaHcSicmACTMfgzFdsLu77ovZnnx8MSLHhz2vYTxiaUfcrllqxW5t0bfNArKjOJBxw/0',
- # 'play_info': {
- # 'cdn_gif_url': '',
- # 'cdn_source_type': 4,
- # 'end_play_time_in_ms': 0,
- # 'video_api': '',
- # 'watermark_type': 0
- # },
- # 'played_cnt': 347350,
- # 'shared_cnt': 35463,
- # 'size': 0,
- # 'status': 10,
- # 'title': '太恶心了_现在的外卖,正在毁掉我们的下一代_Merge',
- # 'upload_time': 1647850532,
- # 'user_info': {
- # 'headimg_url': 'http://mmbiz.qpic.cn/wx_search/duc2TvpEgSRgbVcqh8gWWmQicsib3HJ97O6zfzwiazWMyIIzkKJxluhgQ/0',
- # 'nickname': '代替不了',
- # 'openid': 'oh_m45TTUoolLfj0x0vHl0QdlSw8'
- # },
- # 'vid': 'ugc_b42rvhr',
- # 'video_type': 1,
- # 'width': 720
- # },
- # 'errcode': 0,
- # 'msg': 'ok',
- # 'retcode': 0
- # }
- @classmethod
- def get_dir(cls):
- charles_file_dir = "../chlsfiles/"
- print(charles_file_dir)
- if __name__ == "__main__":
- demo = Demo()
- # demo.feishu()
- demo.today()
- # demo.lists()
- # demo.get_sheet()
- # demo.get_video_info("ugc_iqrmf5")
- # demo.get_dir()
|