123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- """
- @Author : luojunhui
- 小年糕账号爬虫
- """
- import os
- import sys
- import json
- import time
- import uuid
- import random
- import asyncio
- import aiohttp
- sys.path.append(os.getcwd())
- from application.pipeline import PiaoQuanPipeline
- from application.common.messageQueue import MQ
- from application.common import AliyunLogger, MysqlHelper
- from application.functions import get_config_from_mysql, clean_title
- class ImportantXiaoNianGaoAuthor(object):
- """
- 小年糕账号爬虫
- """
- def __init__(self, platform, mode, rule_dict, env="prod"):
- self.download_count = 0
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.env = env
- self.download_cnt = 0
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
- self.important_accounts = {
- "154002316": "58527274",
- "1163011474": "58527278",
- "1122750580": "58527291",
- "37660529": "58527302",
- "156490323": "58527304",
- "262696881": "58527313",
- "1160417236": "58527318",
- "307419007": "58527399",
- "1162974507": "58527564",
- "194287386": "58527570",
- "1163003217": "58527580",
- "1162991035": "58527582",
- "50262268": "58527612",
- "209764266": "58527649",
- "26055443": "58527781",
- "1162977406": "58528076",
- "605290310": "58528077",
- "1160417201": "58528085",
- "32290307": "58528104",
- "1160417318": "58528114",
- "306386778": "58528122",
- "1161593386": "58528130",
- "1161593368": "58528245",
- "260159327": "58528249",
- "801020924": "58528269",
- "287637208": "58528273",
- "555866418": "58528298",
- "303943127": "59758578",
- "1162953017": "60450745",
- "1163013756": "63618095",
- "1162982920": "63642197",
- "15324740": "65487736",
- "170182913": "66807289",
- "1160417241": "66807294",
- "1220202407": "66807300",
- "20680": "66807304",
- "294317767": "66807306",
- "1162980250": "58527284",
- "1163008965": "58527307",
- "230841899": "58527626",
- "1162998153": "58527790",
- "1162954764": "58528095",
- "1160417133": "58528263",
- "1163005063": "58528268",
- "1161593366": "58528275",
- "1162958849": "58528281",
- "1161593379": "58528286",
- "1161593373": "58528334",
- "1163006779": "60450865",
- "311848591": "63642204",
- }
- def read_important_accounts(self):
- """
- 操作 user_list,把重要账号挑选出来
- :return: [ Int ]
- """
- return self.important_accounts.keys()
- async def scan_important_accounts(self, accounts):
- """
- 批量扫描重要账号
- :param accounts:重要账号
- """
- tasks = [self.get_user_videos(account) for account in accounts]
- await asyncio.gather(*tasks)
- async def get_user_videos(self, link):
- """
- 小年糕执行代码, 跳出条件为扫描到三天之前的视频,否则继续抓取
- :param link: 外部账号 id
- """
- url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
- headers = {
- "Host": "kapi-xng-app.xiaoniangao.cn",
- "content-type": "application/json; charset=utf-8",
- "accept": "*/*",
- "authorization": "hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=",
- "verb": "POST",
- "content-md5": "c7b7f8663984e8800e3bcd9b44465083",
- "x-b3-traceid": "2f9da41f960ae077",
- "accept-language": "zh-cn",
- "date": "Mon, 19 Jun 2023 06:41:17 GMT",
- "x-token-id": "",
- "x-signaturemethod": "hmac-sha1",
- "user-agent": "xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0",
- }
- async with aiohttp.ClientSession() as session:
- next_index = -1
- payload = {
- "token": "",
- "limit": 20,
- "start_t": next_index,
- "visited_mid": int(link),
- "share_width": 300,
- "share_height": 240,
- }
- async with session.post(url, headers=headers, json=payload) as response:
- data = await response.json()
- # data_list = data["data"]["list"]
- # if data_list:
- # await self.process_video_pages(data_list, link)
- try:
- data_list = data["data"]["list"]
- if data_list:
- await self.process_video_pages(data_list, link)
- except Exception as e:
- self.aliyun_log.logging(
- code=3000,
- message="在抓取账号out_side:{}\t inside:{} 时报错, 报错原因是{}".format(link,
- self.important_accounts[
- link], e),
- account=self.important_accounts[link]
- )
- async def process_video_pages(self, video_list, link):
- """
- 处理抓取到的某一页的视频
- :param link: 外站 id link
- :param video_list:
- """
- tasks = [self.process_video(video, link) for video in video_list]
- await asyncio.gather(*tasks)
- async def process_video(self, video, link):
- """
- :param link: 外站 id
- :param video: 处理视频信息
- """
- trace_id = self.platform + str(uuid.uuid1())
- # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
- title = clean_title(video.get("title", ""))
- # 随机取一个表情/符号
- emoji = random.choice(
- get_config_from_mysql(self.mode, self.platform, "emoji")
- )
- # 生成最终标题,标题list[表情+title, title+表情]随机取一个
- video_title = random.choice([f"{emoji}{title}", f"{title}{emoji}"])
- # 发布时间
- publish_time_stamp = int(int(video.get("t", 0)) / 1000)
- publish_time_str = time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
- )
- # 用户名 / 头像
- user_name = (
- video.get("user", {})
- .get("nick", "")
- .strip()
- .replace("\n", "")
- .replace("/", "")
- .replace(" ", "")
- .replace(" ", "")
- .replace("&NBSP", "")
- .replace("\r", "")
- )
- video_dict = {
- "video_title": video_title,
- "video_id": video.get("vid", ""),
- "duration": int(video.get("du", 0) / 1000),
- "play_cnt": video.get("play_pv", 0),
- "like_cnt": video.get("favor", {}).get("total", 0),
- "comment_cnt": video.get("comment_count", 0),
- "share_cnt": video.get("share", 0),
- "user_name": user_name,
- "publish_time_stamp": publish_time_stamp,
- "publish_time_str": publish_time_str,
- "update_time_stamp": int(time.time()),
- "video_width": int(video.get("w", 0)),
- "video_height": int(video.get("h", 0)),
- "avatar_url": video.get("user", {}).get("hurl", ""),
- "profile_id": video["id"],
- "profile_mid": video.get("user", {}).get("mid", ""),
- "cover_url": video.get("url", ""),
- "video_url": video.get("v_url", ""),
- "session": f"xiaoniangao-author-{int(time.time())}",
- "out_user_id": video["id"],
- "platform": self.platform,
- "strategy": self.mode,
- "out_video_id": video.get("vid", ""),
- }
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=self.rule_dict,
- env=self.env,
- item=video_dict,
- trace_id=trace_id,
- account=self.important_accounts[link]
- )
- if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
- self.rule_dict.get("period", {}).get("max", 1000)
- ):
- self.aliyun_log.logging(
- code="2004",
- trace_id=trace_id,
- data=video_dict,
- message="发布时间超过{}天".format(
- int(self.rule_dict.get("period", {}).get("max", 1000))
- ),
- account=self.important_accounts[link],
- )
- return
- flag = pipeline.process_item()
- if flag:
- video_dict["width"] = video_dict["video_width"]
- video_dict["height"] = video_dict["video_height"]
- video_dict["crawler_rule"] = json.dumps(self.rule_dict)
- video_dict["user_id"] = self.important_accounts[link]
- video_dict["publish_time"] = video_dict["publish_time_str"]
- self.mq.send_msg(video_dict)
- self.download_count += 1
- self.aliyun_log.logging(
- code="1002",
- data=video_dict,
- trace_id=trace_id,
- message="成功发送 MQ 至 ETL",
- account=self.important_accounts[link],
- )
- async def run(self):
- """
- 控制函数代码
- :return:
- """
- user_list = self.read_important_accounts()
- await self.scan_important_accounts(user_list)
- def get_task_rule():
- """
- :return: 返回任务的规则, task_rule
- """
- rule_dict = {}
- task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = 21;"
- MySQL = MysqlHelper(mode="author", platform="xiaoniangao")
- data = MySQL.select(task_rule_sql)
- if data:
- rule_list = json.loads(data[0][0])
- for item in rule_list:
- for key in item:
- rule_dict[key] = item[key]
- return rule_dict
- async def run_spider(rule_dict):
- """
- 执行爬虫
- :param rule_dict:
- """
- Ixng = ImportantXiaoNianGaoAuthor(
- platform="xiaoniangao",
- mode="author",
- rule_dict=rule_dict
- )
- await Ixng.run()
- async def periodic_task():
- """
- 定时执行异步任务
- """
- while True:
- rule = get_task_rule()
- await run_spider(rule_dict=rule) # 直接在当前事件循环中运行异步任务
- await asyncio.sleep(30 * 60) # 等待30分钟
- async def main():
- """
- main 函数
- """
- await periodic_task()
- if __name__ == "__main__":
- asyncio.run(main())
|