| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 | """@Author  : luojunhui小年糕账号爬虫"""import osimport sysimport jsonimport timeimport uuidimport randomimport asyncioimport aiohttpsys.path.append(os.getcwd())from application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common import AliyunLogger, MysqlHelperfrom application.functions import get_config_from_mysql, clean_titleclass ImportantXiaoNianGaoAuthor(object):    """    小年糕账号爬虫    """    def __init__(self, platform, mode, rule_dict, env="prod"):        self.download_count = 0        self.platform = platform        self.mode = mode        self.rule_dict = rule_dict        self.env = env        self.download_cnt = 0        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)        self.important_accounts = {            "154002316": "58527274",            "1163011474": "58527278",            "1122750580": "58527291",            "37660529": "58527302",            "156490323": "58527304",            "262696881": "58527313",            "1160417236": "58527318",            "307419007": "58527399",            "1162974507": "58527564",            "194287386": "58527570",            "1163003217": "58527580",            "1162991035": "58527582",            "50262268": "58527612",            "209764266": "58527649",            "26055443": "58527781",            "1162977406": "58528076",            "605290310": "58528077",            "1160417201": "58528085",            "32290307": "58528104",            "1160417318": "58528114",            "306386778": "58528122",            "1161593386": "58528130",            "1161593368": "58528245",            "260159327": "58528249",            "801020924": "58528269",            "287637208": "58528273",            "555866418": "58528298",            "303943127": "59758578",            "1162953017": "60450745",            "1163013756": "63618095",            "1162982920": "63642197",            "15324740": "65487736",            "170182913": "66807289",            "1160417241": "66807294",            "1220202407": "66807300",            "20680": "66807304",            "294317767": "66807306",            "1162980250": "58527284",            "1163008965": "58527307",            "230841899": "58527626",            "1162998153": "58527790",            "1162954764": "58528095",            "1160417133": "58528263",            "1163005063": "58528268",            "1161593366": "58528275",            "1162958849": "58528281",            "1161593379": "58528286",            "1161593373": "58528334",            "1163006779": "60450865",            "311848591": "63642204",        }    def read_important_accounts(self):        """        操作 user_list,把重要账号挑选出来        :return: [ Int ]        """        return self.important_accounts.keys()    async def scan_important_accounts(self, accounts):        """        批量扫描重要账号        :param accounts:重要账号        """        tasks = [self.get_user_videos(account) for account in accounts]        await asyncio.gather(*tasks)    async def get_user_videos(self, link):        """        小年糕执行代码, 跳出条件为扫描到三天之前的视频,否则继续抓取        :param link: 外部账号 id        """        url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"        headers = {            "Host": "kapi-xng-app.xiaoniangao.cn",            "content-type": "application/json; charset=utf-8",            "accept": "*/*",            "authorization": "hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=",            "verb": "POST",            "content-md5": "c7b7f8663984e8800e3bcd9b44465083",            "x-b3-traceid": "2f9da41f960ae077",            "accept-language": "zh-cn",            "date": "Mon, 19 Jun 2023 06:41:17 GMT",            "x-token-id": "",            "x-signaturemethod": "hmac-sha1",            "user-agent": "xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0",        }        async with aiohttp.ClientSession() as session:            next_index = -1            payload = {                "token": "",                "limit": 20,                "start_t": next_index,                "visited_mid": int(link),                "share_width": 300,                "share_height": 240,            }            async with session.post(url, headers=headers, json=payload) as response:                data = await response.json()                # data_list = data["data"]["list"]                # if data_list:                #     await self.process_video_pages(data_list, link)                try:                    data_list = data["data"]["list"]                    if data_list:                        await self.process_video_pages(data_list, link)                except Exception as e:                    self.aliyun_log.logging(                        code=3000,                        message="在抓取账号out_side:{}\t inside:{} 时报错, 报错原因是{}".format(link,                                                                                                self.important_accounts[                                                                                                    link], e),                        account=self.important_accounts[link]                    )    async def process_video_pages(self, video_list, link):        """        处理抓取到的某一页的视频        :param link: 外站 id link        :param video_list:        """        tasks = [self.process_video(video, link) for video in video_list]        await asyncio.gather(*tasks)    async def process_video(self, video, link):        """        :param link: 外站 id        :param video: 处理视频信息        """        trace_id = self.platform + str(uuid.uuid1())        # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号        title = clean_title(video.get("title", ""))        # 随机取一个表情/符号        emoji = random.choice(            get_config_from_mysql(self.mode, self.platform, "emoji")        )        # 生成最终标题,标题list[表情+title, title+表情]随机取一个        video_title = random.choice([f"{emoji}{title}", f"{title}{emoji}"])        # 发布时间        publish_time_stamp = int(int(video.get("t", 0)) / 1000)        publish_time_str = time.strftime(            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)        )        # 用户名 / 头像        user_name = (            video.get("user", {})            .get("nick", "")            .strip()            .replace("\n", "")            .replace("/", "")            .replace(" ", "")            .replace(" ", "")            .replace("&NBSP", "")            .replace("\r", "")        )        video_dict = {            "video_title": video_title,            "video_id": video.get("vid", ""),            "duration": int(video.get("du", 0) / 1000),            "play_cnt": video.get("play_pv", 0),            "like_cnt": video.get("favor", {}).get("total", 0),            "comment_cnt": video.get("comment_count", 0),            "share_cnt": video.get("share", 0),            "user_name": user_name,            "publish_time_stamp": publish_time_stamp,            "publish_time_str": publish_time_str,            "update_time_stamp": int(time.time()),            "video_width": int(video.get("w", 0)),            "video_height": int(video.get("h", 0)),            "avatar_url": video.get("user", {}).get("hurl", ""),            "profile_id": video["id"],            "profile_mid": video.get("user", {}).get("mid", ""),            "cover_url": video.get("url", ""),            "video_url": video.get("v_url", ""),            "session": f"xiaoniangao-author-{int(time.time())}",            "out_user_id": video["id"],            "platform": self.platform,            "strategy": self.mode,            "out_video_id": video.get("vid", ""),        }        pipeline = PiaoQuanPipeline(            platform=self.platform,            mode=self.mode,            rule_dict=self.rule_dict,            env=self.env,            item=video_dict,            trace_id=trace_id,            account=self.important_accounts[link]        )        if int(time.time()) - publish_time_stamp > 3600 * 24 * int(                self.rule_dict.get("period", {}).get("max", 1000)        ):            self.aliyun_log.logging(                code="2004",                trace_id=trace_id,                data=video_dict,                message="发布时间超过{}天".format(                    int(self.rule_dict.get("period", {}).get("max", 1000))                ),                account=self.important_accounts[link],            )            return        flag = pipeline.process_item()        if flag:            video_dict["width"] = video_dict["video_width"]            video_dict["height"] = video_dict["video_height"]            video_dict["crawler_rule"] = json.dumps(self.rule_dict)            video_dict["user_id"] = self.important_accounts[link]            video_dict["publish_time"] = video_dict["publish_time_str"]            self.mq.send_msg(video_dict)            self.download_count += 1            self.aliyun_log.logging(                code="1002",                data=video_dict,                trace_id=trace_id,                message="成功发送 MQ 至 ETL",                account=self.important_accounts[link],            )    async def run(self):        """        控制函数代码        :return:        """        user_list = self.read_important_accounts()        await self.scan_important_accounts(user_list)def get_task_rule():    """    :return: 返回任务的规则, task_rule    """    rule_dict = {}    task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = 21;"    MySQL = MysqlHelper(mode="author", platform="xiaoniangao")    data = MySQL.select(task_rule_sql)    if data:        rule_list = json.loads(data[0][0])        for item in rule_list:            for key in item:                rule_dict[key] = item[key]    return rule_dictasync def run_spider(rule_dict):    """    执行爬虫    :param rule_dict:    """    Ixng = ImportantXiaoNianGaoAuthor(        platform="xiaoniangao",        mode="author",        rule_dict=rule_dict    )    await Ixng.run()async def periodic_task():    """    定时执行异步任务    """    while True:        rule = get_task_rule()        await run_spider(rule_dict=rule)  # 直接在当前事件循环中运行异步任务        wait_time = random.randint(10 * 60, 20 * 60)        await asyncio.sleep(wait_time)  # 随机等待 20-40minasync def main():    """    main 函数    """    await periodic_task()if __name__ == "__main__":    asyncio.run(main())
 |