import os import random import sys import time import uuid import json import requests sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper from application.common import Feishu, haiwai_tunnel_proxies class DKYHYRecommend(object): """ 打开迎好运 """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.limit_flag = False self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.mysql = MysqlHelper(mode=self.mode, platform=self) def get_cookie(self): sql = f""" select * from crawler_config where source="{self.platform}" """ configs = self.mysql.select(sql=sql) for config in configs: if "token" in config: token_element = config[3] data_json = json.loads(token_element) token = data_json.get("token") return token def logic(self): for i in range(10): app_id = 'wx2f9f796a36e11d71' js_code = self.get_js_code(app_id) token = self.get_search_params(app_id, js_code) if token: return token def get_js_code(self, app_id: str) -> str: js_code = '' try: url = 'http://61.48.133.26:30001/GetMiniAppCode' data = { "appid": app_id } response =requests.request(method='POST', url=url, json=data) body = response.content.decode() res_data = json.loads(body) js_code = res_data['GetMiniAppCode'] except Exception as e: pass return js_code def get_search_params(self, app_id: str, js_code: str) -> dict: try: url = "https://api.riyingkj.com/api/user/login/v1" payload = json.dumps({ "appid": app_id, "code": js_code, "exp": {} }) headers = { 'Host': 'api.riyingkj.com', 'xweb_xhr': '1', 'X-Token': '', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156', 'Content-Type': 'application/json', 'Accept': '*/*', 'Referer': 'https://servicewechat.com/wx2f9f796a36e11d71/28/page-frame.html', 'Accept-Language': 'zh-CN,zh;q=0.9' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() token = response["token"] except Exception as e: return '' return token def get_recommend_list(self): """ 获取推荐页视频 """ token = self.logic() headers = { 'Host': 'api.riyingkj.com', 'xweb_xhr': '1', 'X-Token': token, 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156', 'Content-Type': 'application/json', 'Accept': '*/*', 'Referer': 'https://servicewechat.com/wx2f9f796a36e11d71/28/page-frame.html', 'Accept-Language': 'zh-CN,zh;q=0.9' } for i in range(20): time.sleep(random.randint(1, 10)) url = "https://api.riyingkj.com/api/recommend/list/v1" payload = json.dumps({ "limit": 5, "exp": {} }) response = requests.request("POST", url, headers=headers, data=payload, proxies=haiwai_tunnel_proxies()) for index, video_obj in enumerate(response.json(), 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj ) self.process_video_obj(video_obj) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format( i, index, e ), ) if self.limit_flag: return time.sleep(random.randint(5, 10)) def process_video_obj(self, video_obj): """ 处理视频 :param video_obj: """ time.sleep(random.randint(3, 8)) trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) item = VideoItem() item.add_video_info("video_id", video_obj["uuid"]) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", 0) item.add_video_info("publish_time_stamp", int(time.time())) item.add_video_info("out_user_id", video_obj["uuid"]) item.add_video_info("cover_url", video_obj["cover_url"]) item.add_video_info("like_cnt", 0) item.add_video_info("video_url", video_obj["urls"][0]) item.add_video_info("out_video_id", video_obj["uuid"]) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(mq_obj) self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True def run(self): self.get_recommend_list() if __name__ == '__main__': J = DKYHYRecommend( platform="dakaiyinghaoyun", mode="recommend", rule_dict={}, user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}], ) J.get_recommend_list() # J.logic()