xiaoniangao.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. """
  2. @Author : luojunhui
  3. 小年糕账号爬虫
  4. """
  5. import os
  6. import sys
  7. import json
  8. import time
  9. import uuid
  10. import random
  11. import asyncio
  12. import aiohttp
  13. sys.path.append(os.getcwd())
  14. from application.pipeline import PiaoQuanPipeline
  15. from application.common.messageQueue import MQ
  16. from application.common import AliyunLogger, MysqlHelper
  17. from application.functions import get_config_from_mysql, clean_title
  18. class ImportantXiaoNianGaoAuthor(object):
  19. """
  20. 小年糕账号爬虫
  21. """
  22. def __init__(self, platform, mode, rule_dict, env="prod"):
  23. self.download_count = 0
  24. self.platform = platform
  25. self.mode = mode
  26. self.rule_dict = rule_dict
  27. self.env = env
  28. self.download_cnt = 0
  29. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  30. self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
  31. self.important_accounts = {
  32. "154002316": "58527274",
  33. "1163011474": "58527278",
  34. "1122750580": "58527291",
  35. "37660529": "58527302",
  36. "156490323": "58527304",
  37. "262696881": "58527313",
  38. "1160417236": "58527318",
  39. "307419007": "58527399",
  40. "1162974507": "58527564",
  41. "194287386": "58527570",
  42. "1163003217": "58527580",
  43. "1162991035": "58527582",
  44. "50262268": "58527612",
  45. "209764266": "58527649",
  46. "26055443": "58527781",
  47. "1162977406": "58528076",
  48. "605290310": "58528077",
  49. "1160417201": "58528085",
  50. "32290307": "58528104",
  51. "1160417318": "58528114",
  52. "306386778": "58528122",
  53. "1161593386": "58528130",
  54. "1161593368": "58528245",
  55. "260159327": "58528249",
  56. "801020924": "58528269",
  57. "287637208": "58528273",
  58. "555866418": "58528298",
  59. "303943127": "59758578",
  60. "1162953017": "60450745",
  61. "1163013756": "63618095",
  62. "1162982920": "63642197",
  63. "15324740": "65487736",
  64. "170182913": "66807289",
  65. "1160417241": "66807294",
  66. "1220202407": "66807300",
  67. "20680": "66807304",
  68. "294317767": "66807306",
  69. "1162980250": "58527284",
  70. "1163008965": "58527307",
  71. "230841899": "58527626",
  72. "1162998153": "58527790",
  73. "1162954764": "58528095",
  74. "1160417133": "58528263",
  75. "1163005063": "58528268",
  76. "1161593366": "58528275",
  77. "1162958849": "58528281",
  78. "1161593379": "58528286",
  79. "1161593373": "58528334",
  80. "1163006779": "60450865",
  81. "311848591": "63642204",
  82. }
  83. def read_important_accounts(self):
  84. """
  85. 操作 user_list,把重要账号挑选出来
  86. :return: [ Int ]
  87. """
  88. return self.important_accounts.keys()
  89. async def scan_important_accounts(self, accounts):
  90. """
  91. 批量扫描重要账号
  92. :param accounts:重要账号
  93. """
  94. tasks = [self.get_user_videos(account) for account in accounts]
  95. await asyncio.gather(*tasks)
  96. async def get_user_videos(self, link):
  97. """
  98. 小年糕执行代码, 跳出条件为扫描到三天之前的视频,否则继续抓取
  99. :param link: 外部账号 id
  100. """
  101. url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
  102. headers = {
  103. "Host": "kapi-xng-app.xiaoniangao.cn",
  104. "content-type": "application/json; charset=utf-8",
  105. "accept": "*/*",
  106. "authorization": "hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=",
  107. "verb": "POST",
  108. "content-md5": "c7b7f8663984e8800e3bcd9b44465083",
  109. "x-b3-traceid": "2f9da41f960ae077",
  110. "accept-language": "zh-cn",
  111. "date": "Mon, 19 Jun 2023 06:41:17 GMT",
  112. "x-token-id": "",
  113. "x-signaturemethod": "hmac-sha1",
  114. "user-agent": "xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0",
  115. }
  116. async with aiohttp.ClientSession() as session:
  117. next_index = -1
  118. payload = {
  119. "token": "",
  120. "limit": 20,
  121. "start_t": next_index,
  122. "visited_mid": int(link),
  123. "share_width": 300,
  124. "share_height": 240,
  125. }
  126. async with session.post(url, headers=headers, json=payload) as response:
  127. data = await response.json()
  128. # data_list = data["data"]["list"]
  129. # if data_list:
  130. # await self.process_video_pages(data_list, link)
  131. try:
  132. data_list = data["data"]["list"]
  133. if data_list:
  134. await self.process_video_pages(data_list, link)
  135. except Exception as e:
  136. self.aliyun_log.logging(
  137. code=3000,
  138. message="在抓取账号out_side:{}\t inside:{} 时报错, 报错原因是{}".format(link,
  139. self.important_accounts[
  140. link], e),
  141. account=self.important_accounts[link]
  142. )
  143. async def process_video_pages(self, video_list, link):
  144. """
  145. 处理抓取到的某一页的视频
  146. :param link: 外站 id link
  147. :param video_list:
  148. """
  149. tasks = [self.process_video(video, link) for video in video_list]
  150. await asyncio.gather(*tasks)
  151. async def process_video(self, video, link):
  152. """
  153. :param link: 外站 id
  154. :param video: 处理视频信息
  155. """
  156. trace_id = self.platform + str(uuid.uuid1())
  157. # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
  158. title = clean_title(video.get("title", ""))
  159. # 随机取一个表情/符号
  160. emoji = random.choice(
  161. get_config_from_mysql(self.mode, self.platform, "emoji")
  162. )
  163. # 生成最终标题,标题list[表情+title, title+表情]随机取一个
  164. video_title = random.choice([f"{emoji}{title}", f"{title}{emoji}"])
  165. # 发布时间
  166. publish_time_stamp = int(int(video.get("t", 0)) / 1000)
  167. publish_time_str = time.strftime(
  168. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  169. )
  170. # 用户名 / 头像
  171. user_name = (
  172. video.get("user", {})
  173. .get("nick", "")
  174. .strip()
  175. .replace("\n", "")
  176. .replace("/", "")
  177. .replace(" ", "")
  178. .replace(" ", "")
  179. .replace("&NBSP", "")
  180. .replace("\r", "")
  181. )
  182. video_dict = {
  183. "video_title": video_title,
  184. "video_id": video.get("vid", ""),
  185. "duration": int(video.get("du", 0) / 1000),
  186. "play_cnt": video.get("play_pv", 0),
  187. "like_cnt": video.get("favor", {}).get("total", 0),
  188. "comment_cnt": video.get("comment_count", 0),
  189. "share_cnt": video.get("share", 0),
  190. "user_name": user_name,
  191. "publish_time_stamp": publish_time_stamp,
  192. "publish_time_str": publish_time_str,
  193. "update_time_stamp": int(time.time()),
  194. "video_width": int(video.get("w", 0)),
  195. "video_height": int(video.get("h", 0)),
  196. "avatar_url": video.get("user", {}).get("hurl", ""),
  197. "profile_id": video["id"],
  198. "profile_mid": video.get("user", {}).get("mid", ""),
  199. "cover_url": video.get("url", ""),
  200. "video_url": video.get("v_url", ""),
  201. "session": f"xiaoniangao-author-{int(time.time())}",
  202. "out_user_id": video["id"],
  203. "platform": self.platform,
  204. "strategy": self.mode,
  205. "out_video_id": video.get("vid", ""),
  206. }
  207. pipeline = PiaoQuanPipeline(
  208. platform=self.platform,
  209. mode=self.mode,
  210. rule_dict=self.rule_dict,
  211. env=self.env,
  212. item=video_dict,
  213. trace_id=trace_id,
  214. account=self.important_accounts[link]
  215. )
  216. if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
  217. self.rule_dict.get("period", {}).get("max", 1000)
  218. ):
  219. self.aliyun_log.logging(
  220. code="2004",
  221. trace_id=trace_id,
  222. data=video_dict,
  223. message="发布时间超过{}天".format(
  224. int(self.rule_dict.get("period", {}).get("max", 1000))
  225. ),
  226. account=self.important_accounts[link],
  227. )
  228. return
  229. flag = pipeline.process_item()
  230. if flag:
  231. video_dict["width"] = video_dict["video_width"]
  232. video_dict["height"] = video_dict["video_height"]
  233. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  234. video_dict["user_id"] = self.important_accounts[link]
  235. video_dict["publish_time"] = video_dict["publish_time_str"]
  236. self.mq.send_msg(video_dict)
  237. self.download_count += 1
  238. self.aliyun_log.logging(
  239. code="1002",
  240. data=video_dict,
  241. trace_id=trace_id,
  242. message="成功发送 MQ 至 ETL",
  243. account=self.important_accounts[link],
  244. )
  245. async def run(self):
  246. """
  247. 控制函数代码
  248. :return:
  249. """
  250. user_list = self.read_important_accounts()
  251. await self.scan_important_accounts(user_list)
  252. def get_task_rule():
  253. """
  254. :return: 返回任务的规则, task_rule
  255. """
  256. rule_dict = {}
  257. task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = 21;"
  258. MySQL = MysqlHelper(mode="author", platform="xiaoniangao")
  259. data = MySQL.select(task_rule_sql)
  260. if data:
  261. rule_list = json.loads(data[0][0])
  262. for item in rule_list:
  263. for key in item:
  264. rule_dict[key] = item[key]
  265. return rule_dict
  266. async def run_spider(rule_dict):
  267. """
  268. 执行爬虫
  269. :param rule_dict:
  270. """
  271. Ixng = ImportantXiaoNianGaoAuthor(
  272. platform="xiaoniangao",
  273. mode="author",
  274. rule_dict=rule_dict
  275. )
  276. await Ixng.run()
  277. async def periodic_task():
  278. """
  279. 定时执行异步任务
  280. """
  281. while True:
  282. rule = get_task_rule()
  283. await run_spider(rule_dict=rule) # 直接在当前事件循环中运行异步任务
  284. wait_time = random.randint(10 * 60, 20 * 60)
  285. await asyncio.sleep(wait_time) # 随机等待 20-40min
  286. async def main():
  287. """
  288. main 函数
  289. """
  290. await periodic_task()
  291. if __name__ == "__main__":
  292. asyncio.run(main())