# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/09/25 import json import os import random import sys import time import requests import urllib3 from fake_useragent import FakeUserAgent from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu from common.publish import Publish from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule # proxies = {"http": None, "https": None} class XiaoNianGaoH5Scheduling: platform = "小年糕" words = "abcdefghijklmnopqrstuvwxyz0123456789" uid_token_dict = { "uid": f"""{"".join(random.sample(words, 8))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 12))}""", "token": "".join(random.sample(words, 32)), } # 获取列表 @classmethod def get_videoList(cls, log_type, crawler, rule_dict, our_uid, env): mq = MQ(topic_name="topic_crawler_etl_" + env) for page in range(1, 101): time.sleep(random.randint(3, 5)) try: Common.logger(log_type, crawler).info(f"正在抓取第{page}页") Common.logging(log_type, crawler, env, f"正在抓取第{page}页") url = "https://kapi.xiaoniangao.cn/trends/get_recommend_trends" payload = { "rec_ab_config": { "ban_ab": 1, "city_slot": 0, "multi_ab": 1, "region_ab": { "num": 4, "position": { "0": 1, "1": 2, "2": 3, "3": 4 } } }, "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!750x500r/crop/750x500/interlace/1/format/jpg/quality/75", "h_qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!80x80r/crop/80x80/interlace/1/format/jpg/quality/75", "limit": 4, # "tag_id": 116, "share_height": 500, "share_width": 625, "log_params": { "proj": "in", "page": "discover_rec", "common": { "os": "OS X 10.15.7", "device": "", "weixinver": "6.8.0", "srcver": "5.71.11" } }, "token": cls.uid_token_dict['token'], "code_ver": "5.71.11", "uid": cls.uid_token_dict['uid'], "proj": "in" } headers = { "Host": "kapi.xiaoniangao.cn", "accept": "application/json, text/plain, */*", "user-agent": FakeUserAgent().random, "content-type": "application/x-www-form-urlencoded", "origin": "https://wx.xiaoniangao.cn", "sec-fetch-site": "same-site", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", "referer": "https://wx.xiaoniangao.cn/", "accept-language": "en", } urllib3.disable_warnings() proxies = Common.tunnel_proxies() r = requests.post(url=url, headers=headers, data=json.dumps(payload), proxies=proxies, verify=False) # r = requests.post( # url=url, headers=headers, data=json.dumps(payload), verify=False # ) if "data" not in r.text or r.status_code != 200: Common.logger(log_type, crawler).warning( f"get_videoList:{r.text}\n" ) Common.logging(log_type, crawler, env, f"get_videoList:{r.text}\n") return elif "data" not in r.json(): Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.json()}\n") return elif "list" not in r.json()["data"]: Common.logger(log_type, crawler).warning( f"get_videoList:{r.json()['data']}\n" ) Common.logging(log_type, crawler, env, f"get_videoList:{r.json()['data']}\n") return elif len(r.json()["data"]["list"]) == 0: Common.logger(log_type, crawler).warning( f"get_videoList:{r.json()['data']['list']}\n" ) Common.logging(log_type, crawler, env, f"get_videoList:{r.json()['data']['list']}\n") return else: # 视频列表数据 feeds = r.json()["data"]["list"] for i in range(len(feeds)): try: # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = ( feeds[i] .get("title", "") .strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) # 随机取一个表情/符号 emoji = random.choice( get_config_from_mysql(log_type, crawler, env, "emoji") ) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice( [ f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}", ] ) # 发布时间 publish_time_stamp = int(int(feeds[i].get("t", 0)) / 1000) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) # 用户名 / 头像 user_name = ( feeds[i] .get("user", {}) .get("nick", "") .strip() .replace("\n", "") .replace("/", "") .replace("快手", "") .replace(" ", "") .replace(" ", "") .replace("&NBSP", "") .replace("\r", "") ) video_dict = { "video_title": video_title, "video_id": feeds[i].get("vid", ""), "duration": int(feeds[i].get("du", 0) / 1000), "play_cnt": feeds[i].get("play_pv", 0), "like_cnt": feeds[i].get("favor", {}).get("total", 0), "comment_cnt": feeds[i].get("comment_count", 0), "share_cnt": feeds[i].get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": int(feeds[i].get("vw", 0)), "video_height": int(feeds[i].get("vh", 0)), "avatar_url": feeds[i].get("user", {}).get("hurl", ""), "profile_id": feeds[i]["id"], "profile_mid": feeds[i]["user"]["mid"], "cover_url": feeds[i].get("url", ""), "video_url": feeds[i].get("v_url", ""), "session": f"xiaoniangao-h5-{int(time.time())}", } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"{video_dict}") # 过滤无效视频 if ( video_title == "" or video_dict["video_id"] == "" or video_dict["video_url"] == "" ): Common.logger(log_type, crawler).warning("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") # 抓取基础规则过滤 elif ( download_rule( log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict, ) is False ): Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=log_type, source=crawler, env=env, text="filter", action="", ) ) is True ): Common.logger(log_type, crawler).info("已中过滤词\n") Common.logging(log_type, crawler, env, '已中过滤词\n') elif ( cls.repeat_video( log_type, crawler, video_dict["video_id"], env ) != 0 ): Common.logger(log_type, crawler).info("视频已下载\n") Common.logging(log_type, crawler, env, '视频已下载\n') else: # cls.download_publish(log_type=log_type, # crawler=crawler, # video_dict=video_dict, # rule_dict=rule_dict, # our_uid=our_uid, # env=env) video_dict["out_user_id"] = video_dict["profile_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = our_uid video_dict["publish_time"] = video_dict[ "publish_time_str" ] video_dict["strategy_type"] = log_type # print(video_dict) mq.send_msg(video_dict) # break except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") except Exception as e: Common.logger(log_type, crawler).error(f"抓取第{page}页时异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取第{page}页时异常:{e}\n") @classmethod def repeat_video(cls, log_type, crawler, video_id, env): # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """ sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # if __name__ == "__main__": # XNG_H5 = XiaoNianGaoH5Scheduling # XNG_H5.get_videoList( # log_type="H5", crawler="xiaoniangao", rule_dict={}, our_uid="ljh", env="dev" # )