import os import random import sys import time import uuid import json from datetime import datetime import requests from application.common import Feishu sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.proxies import tunnel_proxies from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper class XNGHTecommend(object): """ 小年糕-话题 """ def __init__(self, platform, mode, rule_dict, user_list, env="prod"): self.limit_flag = False self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_list = user_list self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.expire_flag = False self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform) self.mysql = MysqlHelper(mode=self.mode, platform=self) def get_recommend_list(self): if self.expire_flag: self.aliyun_log.logging( code="2000", message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt), ) return """ 获取推荐页视频 """ headers = { 'Host': 'kapi.xiaoniangao.cn', 'xweb_xhr': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156', 'content-type': 'application/json', 'accept': '*/*', 'sec-fetch-site': 'cross-site', 'sec-fetch-mode': 'cors', 'sec-fetch-dest': 'empty', 'referer': 'https://servicewechat.com/wxd7911e4c177690e4/763/page-frame.html', 'accept-language': 'zh-CN,zh;q=0.9' } score = -1 # id = 1245 # 妇女节 # id = 1248 # 二月二 # id = 1253 # 清明节 # id = 1261 # 母亲节 id = 1265 # 六一 score = -1 while True: time.sleep(random.randint(1, 10)) url = "https://kapi.xiaoniangao.cn/sub/get_sub_cont_list" payload = json.dumps({ "id": id, "score": score, "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!750x500r/crop/750x500/interlace/1/format/jpg", "token": "9db63d9ce83aaf76433a4afa54e61e0b", "uid": "78212db8-abbf-46db-8ff3-abf00967f461", "proj": "ma", "wx_ver": "3.8.6", "code_ver": "4.64.0", "log_common_params": { "e": [ { "data": { "page": "subjectPage" } } ], "ext": { "brand": "apple", "device": "MacBookPro14,1", "os": "Mac OS X 11.6.7", "weixinver": "3.8.6", "srcver": "3.3.5", "net": "wifi", "scene": "1089" }, "pj": "1", "pf": "2", "session_id": "eadf7939-5db1-4fd0-b0a7-a785a91f721d" } }) response = requests.request("POST", url, headers=headers, data=payload) score = response.json()['data']['score'] for index, video_obj in enumerate(response.json()['data']['list'], 1): try: self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=video_obj ) self.process_video_obj(video_obj) except Exception as e: self.aliyun_log.logging( code="3000", message="抓取单条视频失败,第{}条报错原因是{}".format( index, e ), ) if self.limit_flag: return time.sleep(random.randint(5, 10)) def process_video_obj(self, video_obj): """ 处理视频 :param video_obj: """ time.sleep(random.randint(3, 8)) trace_id = self.platform + str(uuid.uuid1()) our_user = random.choice(self.user_list) play_cnt = video_obj["play_pv"] if play_cnt > 3000: item = VideoItem() item.add_video_info("video_id", video_obj["vid"]) item.add_video_info("video_title", video_obj["title"]) item.add_video_info("play_cnt", play_cnt) item.add_video_info("publish_time_stamp", int(time.time())) item.add_video_info("out_user_id", video_obj["vid"]) item.add_video_info("cover_url", video_obj["url"]) item.add_video_info("like_cnt", 0) item.add_video_info("video_url", video_obj["v_url"]) item.add_video_info("out_video_id", video_obj["vid"]) item.add_video_info("platform", self.platform) item.add_video_info("strategy", self.mode) item.add_video_info("session", "{}-{}".format(self.platform, int(time.time()))) item.add_video_info("user_id", our_user["uid"]) item.add_video_info("user_name", our_user["nick_name"]) # 获取当前时间 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [[ video_obj["vid"], formatted_time, video_obj["title"], video_obj["url"], video_obj["v_url"] ]] Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "BZb4Tc", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values('xiaoniangao', 'xiaoniangao', "BZb4Tc", "A2:Z2", values) mq_obj = item.produce_item() pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=mq_obj, trace_id=trace_id, ) if pipeline.process_item(): self.download_cnt += 1 self.mq.send_msg(mq_obj) self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj) if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 200) ): self.limit_flag = True def run(self): self.get_recommend_list() if __name__ == '__main__': J = XNGHTecommend( platform="xiaonianggaohuati", mode="recommend", rule_dict={}, user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}], ) J.get_recommend_list()