""" @Author : luojunhui 小年糕账号爬虫 """ import os import sys import json import time import uuid import random import asyncio import aiohttp sys.path.append(os.getcwd()) from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common import AliyunLogger, MysqlHelper from application.functions import get_config_from_mysql, clean_title class ImportantXiaoNianGaoAuthor(object): """ 小年糕账号爬虫 """ def __init__(self, platform, mode, rule_dict, env="prod"): self.download_count = 0 self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode) self.important_accounts = { "154002316": "58527274", "1163011474": "58527278", "1122750580": "58527291", "37660529": "58527302", "156490323": "58527304", "262696881": "58527313", "1160417236": "58527318", "307419007": "58527399", "1162974507": "58527564", "194287386": "58527570", "1163003217": "58527580", "1162991035": "58527582", "50262268": "58527612", "209764266": "58527649", "26055443": "58527781", "1162977406": "58528076", "605290310": "58528077", "1160417201": "58528085", "32290307": "58528104", "1160417318": "58528114", "306386778": "58528122", "1161593386": "58528130", "1161593368": "58528245", "260159327": "58528249", "801020924": "58528269", "287637208": "58528273", "555866418": "58528298", "303943127": "59758578", "1162953017": "60450745", "1163013756": "63618095", "1162982920": "63642197", "15324740": "65487736", "170182913": "66807289", "1160417241": "66807294", "1220202407": "66807300", "20680": "66807304", "294317767": "66807306", "1162980250": "58527284", "1163008965": "58527307", "230841899": "58527626", "1162998153": "58527790", "1162954764": "58528095", "1160417133": "58528263", "1163005063": "58528268", "1161593366": "58528275", "1162958849": "58528281", "1161593379": "58528286", "1161593373": "58528334", "1163006779": "60450865", "311848591": "63642204", } def read_important_accounts(self): """ 操作 user_list,把重要账号挑选出来 :return: [ Int ] """ return self.important_accounts.keys() async def scan_important_accounts(self, accounts): """ 批量扫描重要账号 :param accounts:重要账号 """ tasks = [self.get_user_videos(account) for account in accounts] await asyncio.gather(*tasks) async def get_user_videos(self, link): """ 小年糕执行代码, 跳出条件为扫描到三天之前的视频,否则继续抓取 :param link: 外部账号 id """ url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" headers = { "Host": "kapi-xng-app.xiaoniangao.cn", "content-type": "application/json; charset=utf-8", "accept": "*/*", "authorization": "hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=", "verb": "POST", "content-md5": "c7b7f8663984e8800e3bcd9b44465083", "x-b3-traceid": "2f9da41f960ae077", "accept-language": "zh-cn", "date": "Mon, 19 Jun 2023 06:41:17 GMT", "x-token-id": "", "x-signaturemethod": "hmac-sha1", "user-agent": "xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0", } async with aiohttp.ClientSession() as session: next_index = -1 payload = { "token": "", "limit": 20, "start_t": next_index, "visited_mid": int(link), "share_width": 300, "share_height": 240, } async with session.post(url, headers=headers, json=payload) as response: data = await response.json() # data_list = data["data"]["list"] # if data_list: # await self.process_video_pages(data_list, link) try: data_list = data["data"]["list"] if data_list: await self.process_video_pages(data_list, link) except Exception as e: self.aliyun_log.logging( code=3000, message="在抓取账号out_side:{}\t inside:{} 时报错, 报错原因是{}".format(link, self.important_accounts[ link], e), account=self.important_accounts[link] ) async def process_video_pages(self, video_list, link): """ 处理抓取到的某一页的视频 :param link: 外站 id link :param video_list: """ tasks = [self.process_video(video, link) for video in video_list] await asyncio.gather(*tasks) async def process_video(self, video, link): """ :param link: 外站 id :param video: 处理视频信息 """ trace_id = self.platform + str(uuid.uuid1()) # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 title = clean_title(video.get("title", "")) # 随机取一个表情/符号 emoji = random.choice( get_config_from_mysql(self.mode, self.platform, "emoji") ) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice([f"{emoji}{title}", f"{title}{emoji}"]) # 发布时间 publish_time_stamp = int(int(video.get("t", 0)) / 1000) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) # 用户名 / 头像 user_name = ( video.get("user", {}) .get("nick", "") .strip() .replace("\n", "") .replace("/", "") .replace(" ", "") .replace(" ", "") .replace("&NBSP", "") .replace("\r", "") ) video_dict = { "video_title": video_title, "video_id": video.get("vid", ""), "duration": int(video.get("du", 0) / 1000), "play_cnt": video.get("play_pv", 0), "like_cnt": video.get("favor", {}).get("total", 0), "comment_cnt": video.get("comment_count", 0), "share_cnt": video.get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "video_width": int(video.get("w", 0)), "video_height": int(video.get("h", 0)), "avatar_url": video.get("user", {}).get("hurl", ""), "profile_id": video["id"], "profile_mid": video.get("user", {}).get("mid", ""), "cover_url": video.get("url", ""), "video_url": video.get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}", "out_user_id": video["id"], "platform": self.platform, "strategy": self.mode, "out_video_id": video.get("vid", ""), } pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id, account=self.important_accounts[link] ) if int(time.time()) - publish_time_stamp > 3600 * 24 * int( self.rule_dict.get("period", {}).get("max", 1000) ): self.aliyun_log.logging( code="2004", trace_id=trace_id, data=video_dict, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), account=self.important_accounts[link], ) return flag = pipeline.process_item() if flag: video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.important_accounts[link] video_dict["publish_time"] = video_dict["publish_time_str"] self.mq.send_msg(video_dict) self.download_count += 1 self.aliyun_log.logging( code="1002", data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", account=self.important_accounts[link], ) async def run(self): """ 控制函数代码 :return: """ user_list = self.read_important_accounts() await self.scan_important_accounts(user_list) def get_task_rule(): """ :return: 返回任务的规则, task_rule """ rule_dict = {} task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = 21;" MySQL = MysqlHelper(mode="author", platform="xiaoniangao") data = MySQL.select(task_rule_sql) if data: rule_list = json.loads(data[0][0]) for item in rule_list: for key in item: rule_dict[key] = item[key] return rule_dict async def run_spider(rule_dict): """ 执行爬虫 :param rule_dict: """ Ixng = ImportantXiaoNianGaoAuthor( platform="xiaoniangao", mode="author", rule_dict=rule_dict ) await Ixng.run() async def periodic_task(): """ 定时执行异步任务 """ while True: rule = get_task_rule() await run_spider(rule_dict=rule) # 直接在当前事件循环中运行异步任务 wait_time = random.randint(10 * 60, 20 * 60) await asyncio.sleep(wait_time) # 随机等待 20-40min async def main(): """ main 函数 """ await periodic_task() if __name__ == "__main__": asyncio.run(main())