| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 | import osimport randomimport sysimport timeimport uuidimport jsonfrom datetime import datetimeimport requestsfrom application.common import Feishusys.path.append(os.getcwd())from application.items import VideoItemfrom application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common.proxies import tunnel_proxiesfrom application.common.log import AliyunLoggerfrom application.common.mysql import MysqlHelperclass XNGHTecommend(object):    """    小年糕-话题    """    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):        self.limit_flag = False        self.platform = platform        self.mode = mode        self.rule_dict = rule_dict        self.user_list = user_list        self.env = env        self.download_cnt = 0        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)        self.expire_flag = False        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)        self.mysql = MysqlHelper(mode=self.mode, platform=self)    def get_recommend_list(self):        if self.expire_flag:            self.aliyun_log.logging(                code="2000",                message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),            )            return        """        获取推荐页视频        """        headers = {            'Host': 'kapi.xiaoniangao.cn',            'xweb_xhr': '1',            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156',            'content-type': 'application/json',            'accept': '*/*',            'sec-fetch-site': 'cross-site',            'sec-fetch-mode': 'cors',            'sec-fetch-dest': 'empty',            'referer': 'https://servicewechat.com/wxd7911e4c177690e4/763/page-frame.html',            'accept-language': 'zh-CN,zh;q=0.9'        }        score = -1        # id = 1245  # 妇女节        # id = 1248  # 二月二        # id = 1253 # 清明节        # id = 1261  # 母亲节        id = 1265  # 六一        score = -1        while True:            time.sleep(random.randint(1, 10))            url = "https://kapi.xiaoniangao.cn/sub/get_sub_cont_list"            payload = json.dumps({                "id": id,                "score": score,                "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!750x500r/crop/750x500/interlace/1/format/jpg",                "token": "9db63d9ce83aaf76433a4afa54e61e0b",                "uid": "78212db8-abbf-46db-8ff3-abf00967f461",                "proj": "ma",                "wx_ver": "3.8.6",                "code_ver": "4.64.0",                "log_common_params": {                    "e": [                        {                            "data": {                                "page": "subjectPage"                            }                        }                    ],                    "ext": {                        "brand": "apple",                        "device": "MacBookPro14,1",                        "os": "Mac OS X 11.6.7",                        "weixinver": "3.8.6",                        "srcver": "3.3.5",                        "net": "wifi",                        "scene": "1089"                    },                    "pj": "1",                    "pf": "2",                    "session_id": "eadf7939-5db1-4fd0-b0a7-a785a91f721d"                }            })            response = requests.request("POST", url, headers=headers, data=payload)            score = response.json()['data']['score']            for index, video_obj in enumerate(response.json()['data']['list'], 1):                try:                    self.aliyun_log.logging(                        code="1001", message="扫描到一条视频", data=video_obj                    )                    self.process_video_obj(video_obj)                except Exception as e:                    self.aliyun_log.logging(                        code="3000",                        message="抓取单条视频失败,第{}条报错原因是{}".format(                            index, e                        ),                    )            if self.limit_flag:                return            time.sleep(random.randint(5, 10))    def process_video_obj(self, video_obj):        """        处理视频        :param video_obj:        """        time.sleep(random.randint(3, 8))        trace_id = self.platform + str(uuid.uuid1())        our_user = random.choice(self.user_list)        play_cnt = video_obj["play_pv"]        if play_cnt > 3000:            item = VideoItem()            item.add_video_info("video_id", video_obj["vid"])            item.add_video_info("video_title", video_obj["title"])            item.add_video_info("play_cnt", play_cnt)            item.add_video_info("publish_time_stamp", int(time.time()))            item.add_video_info("out_user_id", video_obj["vid"])            item.add_video_info("cover_url", video_obj["url"])            item.add_video_info("like_cnt", 0)            item.add_video_info("video_url", video_obj["v_url"])            item.add_video_info("out_video_id", video_obj["vid"])            item.add_video_info("platform", self.platform)            item.add_video_info("strategy", self.mode)            item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))            item.add_video_info("user_id", our_user["uid"])            item.add_video_info("user_name", our_user["nick_name"])            # 获取当前时间            current_time = datetime.now()            formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")            values = [[                video_obj["vid"],                formatted_time,                video_obj["title"],                video_obj["url"],                video_obj["v_url"]            ]]            Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "BZb4Tc", "ROWS", 1, 2)            time.sleep(0.5)            Feishu.update_values('xiaoniangao', 'xiaoniangao', "BZb4Tc", "A2:Z2", values)            mq_obj = item.produce_item()            pipeline = PiaoQuanPipeline(                platform=self.platform,                mode=self.mode,                rule_dict=self.rule_dict,                env=self.env,                item=mq_obj,                trace_id=trace_id,            )            if pipeline.process_item():                self.download_cnt += 1                self.mq.send_msg(mq_obj)                self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)                if self.download_cnt >= int(                        self.rule_dict.get("videos_cnt", {}).get("min", 200)                ):                    self.limit_flag = True    def run(self):        self.get_recommend_list()if __name__ == '__main__':    J = XNGHTecommend(        platform="xiaonianggaohuati",        mode="recommend",        rule_dict={},        user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],    )    J.get_recommend_list()
 |