123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/2/28
- import json
- import os
- import sys
- import time
- from datetime import date, timedelta
- import requests
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.feishu import Feishu
- class Test:
- # 获取微信 key / openid
- @classmethod
- def get_wechat_key(cls, log_type, crawler):
- """
- 获取微信 key / openid
- https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k
- :param log_type: 日志名
- :param crawler: 哪款爬虫,填写:weixinzhishu
- :return: search_key, openid
- """
- try:
- # while True:
- sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k')
- # if sheet is None:
- # Common.logger(log_type, crawler).warning(f"获取热词sheet:{sheet} ,10秒钟后重试")
- # time.sleep(10)
- # else:
- # break
- for i in range(len(sheet)):
- search_key = sheet[1][1]
- openid = sheet[1][2]
- return search_key, openid
- except Exception as e:
- Common.logger(log_type, crawler).error(f"wechat_key:{e}\n")
- @classmethod
- def get_words(cls, log_type, crawler):
- try:
- while True:
- sheet = Feishu.get_values_batch(log_type, crawler, '6dsgUk')
- if sheet is None:
- Common.logger(log_type, crawler).warning(f"获取热词sheet:{sheet} ,10秒钟后重试")
- time.sleep(10)
- else:
- break
- word_list = []
- for x in sheet:
- for y in x:
- if y is None:
- pass
- else:
- word_list.append(y)
- return word_list
- except Exception as e:
- Common.logger(log_type, crawler).error(f"get_words:{e}\n")
- @classmethod
- def get_score_test(cls, log_type, crawler):
- start_ymd = (date.today() + timedelta(days=-7)).strftime("%Y%m%d")
- end_ymd = (date.today() + timedelta(days=0)).strftime("%Y%m%d")
- word_list = cls.get_words(log_type, crawler)
- for i in range(len(word_list)):
- Common.logger(log_type, crawler).info(f"热词: {word_list[i]}")
- while True:
- wechat_key = cls.get_wechat_key(log_type, crawler)
- if wechat_key is None:
- Common.logger(log_type, crawler).info(
- f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} auth 过期,休眠 10 秒,重新获取")
- time.sleep(10)
- continue
- search_key = wechat_key[0]
- openid = wechat_key[-1]
- url = "https://search.weixin.qq.com/cgi-bin/wxaweb/wxindex"
- payload = json.dumps({
- "openid": openid,
- "search_key": search_key,
- "cgi_name": "GetDefaultIndex",
- "start_ymd": start_ymd,
- "end_ymd": end_ymd,
- "query": word_list[i]
- })
- headers = {
- 'Host': 'search.weixin.qq.com',
- 'content-type': 'application/json',
- 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.32(0x1800202a) NetType/WIFI Language/zh_CN',
- 'Referer': 'https://servicewechat.com/wxc026e7662ec26a3a/42/page-frame.html'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- if response.json()['code'] == -10000:
- Common.logger(log_type, crawler).info(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} response:{response.json()['msg']} 休眠 10 秒,重新获取")
- time.sleep(10)
- continue
- wechat_score_list = []
- word_wechat_score_dict = {
- "id": i+1,
- "word": word_list[i],
- "wechatScores": wechat_score_list,
- }
- if response.json()['code'] == -10002:
- Common.logger(log_type, crawler).info("该词暂未收录")
- # 写飞书
- if word_list[i] in [x for y in Feishu.get_values_batch(log_type, crawler, "5011a2") for x in y]:
- Common.logger(log_type, crawler).info("该词已存在")
- continue
- Feishu.insert_columns(log_type, crawler, "5011a2", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(log_type, crawler, "5011a2", "F2:Z2",
- [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
- word_list[i],
- "",
- "该词暂未收录"]])
- Common.logger(log_type, crawler).info("写入飞书成功\n")
- elif response.json()['code'] != 0:
- Common.logger(log_type, crawler).warning(f"{word_wechat_score_dict}")
- continue
- else:
- time_index = response.json()['content']['resp_list'][0]['indexes'][0]['time_indexes']
- for x in range(len(time_index)):
- Common.logger(log_type, crawler).info(f"正在更新 {word_list[i]}")
- score_time = time_index[x]['time']
- score_time_str = f"{str(score_time)[:4]}-{str(score_time)[4:6]}-{str(score_time)[6:]}"
- score = time_index[x]['score']
- wechat_score_dict = {"score": score, "scoreDate": score_time_str}
- wechat_score_list.append(wechat_score_dict)
- Common.logger(log_type, crawler).info(f"wechat_score_dict:{wechat_score_dict}")
- Feishu.insert_columns(log_type, crawler, "5011a2", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(log_type, crawler, "5011a2", "F2:Z2", [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
- word_list[i],
- score_time_str,
- score]])
- Common.logger(log_type, crawler).info("写入飞书成功\n")
- break
- if __name__ == "__main__":
- Test.get_score_test("out", "weixinzhishu")
- pass
|