# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/10 import os import sys import time from datetime import date, timedelta import requests import json sys.path.append(os.getcwd()) from common.feishu import Feishu from common.common import Common class Weixinzhishu: pageNum = 1 # 获取微信 key / openid @classmethod def get_wechat_key(cls, log_type, crawler): """ 获取微信 key / openid https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k :param log_type: 日志名 :param crawler: 哪款爬虫,填写:weixinzhishu :return: search_key, openid """ try: while True: sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k') if sheet is None: time.sleep(1) continue for i in range(len(sheet)): search_key = sheet[1][1] openid = sheet[1][2] return search_key, openid except Exception as e: Common.logger(log_type, crawler).error(f"wechat_key:{e}\n") # 获取热词 @classmethod def get_word(cls, log_type, crawler, host): try: url = '/hot/word/getAllWords' params = { 'pageNum': cls.pageNum, # 第几页,默认1,int 'pageSize': 100 # 请求条目数,默认为100,int } response = requests.post(url=host+url, json=params) cls.pageNum += 1 if response.status_code != 200: Common.logger(log_type, crawler).warning(f"get_word_response:{response.text}\n") elif response.json()['message'] != "success": Common.logger(log_type, crawler).warning(f"get_word_response:{response.json()}\n") else: word_list = response.json()['data']['words'] return word_list except Exception as e: Common.logger(log_type, crawler).error(f"get_word:{e}\n") # 获取热词分数 @classmethod def get_word_score(cls, log_type, crawler, word_id, word): """ 获取热词分数 :param log_type: 日志名 :param crawler: 哪款爬虫,填写:weixinzhishu :param word_id: 热词 ID :param word: 热词 :return: 热词 7 天指数,例如: {'id': 1, 'word': '消息', 'wechatScores': [ {'score': 95521022, 'scoreDate': '2023-02-07'}, {'score': 97315283, 'scoreDate': '2023-02-08'}, {'score': 109845849, 'scoreDate': '2023-02-09'}, {'score': 107089560, 'scoreDate': '2023-02-10'}, {'score': 102658391, 'scoreDate': '2023-02-11'}, {'score': 93843701, 'scoreDate': '2023-02-12'}, {'score': 100211894, 'scoreDate': '2023-02-13'}]} """ try: while True: wechat_key = cls.get_wechat_key(log_type, crawler) search_key = wechat_key[0] openid = wechat_key[-1] start_ymd = (date.today() + timedelta(days=-7)).strftime("%Y%m%d") end_ymd = (date.today() + timedelta(days=0)).strftime("%Y%m%d") url = "https://search.weixin.qq.com/cgi-bin/wxaweb/wxindex" payload = json.dumps({ "openid": openid, "search_key": search_key, "cgi_name": "GetDefaultIndex", "start_ymd": start_ymd, "end_ymd": end_ymd, "query": word }) headers = { 'Host': 'search.weixin.qq.com', 'content-type': 'application/json', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.32(0x1800202a) NetType/WIFI Language/zh_CN', 'Referer': 'https://servicewechat.com/wxc026e7662ec26a3a/42/page-frame.html' } response = requests.request("POST", url=url, headers=headers, data=payload) wechat_score_list = [] word_wechat_score_dict = { "id": word_id, "word": word, "wechatScores": wechat_score_list, } if response.json()['code'] == -10000: Common.logger(log_type, crawler).warning(f"response:{response.json()['msg']} 休眠 10 秒,重新获取") time.sleep(10) continue if response.json()['code'] == -10002: Common.logger(log_type, crawler).info(f'{word}:该词暂未收录') return word_wechat_score_dict elif response.json()['code'] != 0: Common.logger(log_type, crawler).info(f'response:{response.text}\n') return word_wechat_score_dict else: time_index = response.json()['content']['resp_list'][0]['indexes'][0]['time_indexes'] for i in range(len(time_index)): score_time = time_index[i]['time'] score_time_str = f"{str(score_time)[:4]}-{str(score_time)[4:6]}-{str(score_time)[6:]}" score = time_index[i]['score'] wechat_score_dict = {"score": score, "scoreDate": score_time_str} wechat_score_list.append(wechat_score_dict) return word_wechat_score_dict except Exception as e: Common.logger(log_type, crawler).error(f"get_word_score异常:{e}\n") # 获取微信指数 @classmethod def get_wechat_score(cls, log_type, crawler, host): """ 获取微信指数 :param log_type: 日志名 :param crawler: 哪款爬虫 :param host: 域名 :return: 热词指数列表 """ score_num = 0 while True: word_list = cls.get_word(log_type, crawler, host) if len(word_list) == 0: Common.logger(log_type, crawler).info(f"热词更新完毕\n") cls.pageNum = 1 Common.logger(log_type, crawler).info(f"score_num: {score_num}") return else: wechat_score_data = [] Common.logger(log_type, crawler).info(f"len(word_list):{len(word_list)}") for i in range(len(word_list)): word_id = word_list[i]['id'] word = word_list[i]['word'] Common.logger(log_type, crawler).info(f"word_id:{word_id}") Common.logger(log_type, crawler).info(f"word:{word}") word_score_dict = cls.get_word_score(log_type, crawler, word_id, word) Common.logger(log_type, crawler).info(f"word_score_dict:{word_score_dict}\n") wechat_score_data.append(word_score_dict) if word_score_dict['wechatScores'] is not None: score_num += len(word_score_dict['wechatScores']) Common.logger(log_type, crawler).info(f"wechat_score_data:{wechat_score_data}\n") cls.update_wechat_score(log_type, crawler, wechat_score_data, host) # 更新微信指数 @classmethod def update_wechat_score(cls, log_type, crawler, data, host): """ 更新热词微信指数 :param log_type: 日志名 :param crawler: 哪款爬虫 :param data: 热词微信指数 :param host: 域名 :return: {"code":200, "message":"success"} """ try: url = '/hot/word/updateWechatScore' params = {'data': data} response = requests.post(url=host+url, json=params) if response.status_code != 200: Common.logger(log_type, crawler).warning(f"update_wechat_score_response:{response.text}\n") elif response.json()["message"] != "success": Common.logger(log_type, crawler).warning(f"update_wechat_score_response:{response.json()}\n") else: Common.logger(log_type, crawler).info(f"更新热词微信指数:{response.json()['message']}\n") except Exception as e: Common.logger(log_type, crawler).error(f"update_wechat_score:{e}\n") @classmethod def get_score_test(cls, log_type, crawler, word_id, word): wechat_key = cls.get_wechat_key(log_type, crawler) search_key = wechat_key[0] openid = wechat_key[-1] end_ymd = (date.today() + timedelta(days=0)).strftime("%Y%m%d") start_ymd = (date.today() + timedelta(days=-7)).strftime("%Y%m%d") url = "https://search.weixin.qq.com/cgi-bin/wxaweb/wxindex" payload = json.dumps({ "openid": openid, "search_key": search_key, "cgi_name": "GetDefaultIndex", "start_ymd": start_ymd, "end_ymd": end_ymd, "query": word }) headers = { 'Host': 'search.weixin.qq.com', 'content-type': 'application/json', 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.32(0x1800202a) NetType/WIFI Language/zh_CN', 'Referer': 'https://servicewechat.com/wxc026e7662ec26a3a/42/page-frame.html' } response = requests.request("POST", url, headers=headers, data=payload) wechat_score_list = [] word_wechat_score_dict = { "id": word_id, "word": word, "wechatScores": wechat_score_list, } if response.json()['code'] == -10000: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} response:{response.json()['msg']} 休眠 10 秒,重新获取") time.sleep(10) cls.get_score_test(log_type, crawler, word_id, word) elif response.json()['code'] == -10002: print("该词暂未收录") print(f"{word_wechat_score_dict}") elif response.json()['code'] != 0: print(f"{word_wechat_score_dict}") else: time_index = response.json()['content']['resp_list'][0]['indexes'][0]['time_indexes'] for i in range(len(time_index)): score_time = time_index[i]['time'] score_time_str = f"{str(score_time)[:4]}-{str(score_time)[4:6]}-{str(score_time)[6:]}" score = time_index[i]['score'] wechat_score_dict = {"score": score, "scoreDate": score_time_str} wechat_score_list.append(wechat_score_dict) print(f"wechat_score_dict:{wechat_score_dict}") print(word_wechat_score_dict) if __name__ == "__main__": Weixinzhishu.get_score_test('weixin', 'weixinzhishu', 1 , "乌克兰") pass