xiaoniangao_author_v2.py 13 KB


  1. import json
  2. import os
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. from common.mq import MQ
  9. sys.path.append(os.getcwd())
  10. from common.common import Common
  11. from common import AliyunLogger, PiaoQuanPipeline
  12. from common.public import get_config_from_mysql, clean_title
  13. from common.limit import AuthorLimit
  14. def tunnel_proxies():
  15. # 隧道域名:端口号
  16. tunnel = "q796.kdltps.com:15818"
  17. # 用户名密码方式
  18. username = "t17772369458618"
  19. password = "5zqcjkmy"
  20. tunnel_proxies = {
  21. "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
  22. % {"user": username, "pwd": password, "proxy": tunnel},
  23. "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
  24. % {"user": username, "pwd": password, "proxy": tunnel},
  25. }
  26. return tunnel_proxies
  27. class XiaoNianGaoAuthor:
  28. def __init__(self, platform, mode, rule_dict, env, user_list):
  29. self.platform = platform
  30. self.mode = mode
  31. self.rule_dict = rule_dict
  32. self.env = env
  33. self.user_list = user_list
  34. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  35. self.download_count = 0
  36. self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323,
  37. 58528071, 58527278]
  38. self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
  39. def get_author_list(self):
  40. # 每轮只抓取定量的数据,到达数量后自己退出
  41. max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 140))
  42. for user_dict in self.user_list:
  43. if self.download_count <= max_count:
  44. self.get_video_list(user_dict)
  45. # time.sleep(random.randint(1, 10))
  46. time.sleep(1)
  47. else:
  48. AliyunLogger.logging(
  49. code="2000",
  50. platform=self.platform,
  51. mode=self.mode,
  52. env=self.env,
  53. message="本轮已经抓取足够数量的视频,已经自动退出",
  54. )
  55. Common.logging(
  56. log_type=self.mode,
  57. crawler=self.platform,
  58. env=self.env,
  59. message="本轮已经抓取足够数量的视频,已经自动退出",
  60. )
  61. return
  62. def get_video_list(self, user_dict):
  63. next_t = -1
  64. # 只抓取更新的视频,如果刷到已经更新的立即退出
  65. url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
  66. headers = {
  67. 'Host': 'kapi-xng-app.xiaoniangao.cn',
  68. 'content-type': 'application/json; charset=utf-8',
  69. 'accept': '*/*',
  70. 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=',
  71. 'verb': 'POST',
  72. 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083',
  73. 'x-b3-traceid': '2f9da41f960ae077',
  74. 'accept-language': 'zh-cn',
  75. 'date': 'Mon, 19 Jun 2023 06:41:17 GMT',
  76. 'x-token-id': '',
  77. 'x-signaturemethod': 'hmac-sha1',
  78. 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0'
  79. }
  80. while True:
  81. payload = {
  82. "token": "",
  83. "limit": 20,
  84. "start_t": next_t,
  85. "visited_mid": int(user_dict["link"]),
  86. "share_width": 300,
  87. "share_height": 240,
  88. }
  89. response = requests.request(
  90. "POST",
  91. url,
  92. headers=headers,
  93. data=json.dumps(payload),
  94. # proxies=tunnel_proxies(),
  95. )
  96. if "data" not in response.text or response.status_code != 200:
  97. Common.logger(self.mode, self.platform).info(
  98. f"get_videoList:{response.text}\n"
  99. )
  100. Common.logging(
  101. log_type=self.mode,
  102. crawler=self.platform,
  103. env=self.env,
  104. message=f"get_videoList:{response.text}\n"
  105. )
  106. AliyunLogger.logging(
  107. code="2000",
  108. platform=self.platform,
  109. mode=self.mode,
  110. env=self.env,
  111. message=f"get_videoList:{response.text}\n",
  112. )
  113. return
  114. elif "list" not in response.json()["data"]:
  115. Common.logger(self.mode, self.platform).info(
  116. f"get_videoList:{response.json()}\n"
  117. )
  118. Common.logging(
  119. log_type=self.mode,
  120. crawler=self.platform,
  121. env=self.env,
  122. message=f"get_videoList:{response.json()}\n"
  123. )
  124. AliyunLogger.logging(
  125. code="2000",
  126. platform=self.platform,
  127. mode=self.mode,
  128. env=self.env,
  129. message=f"get_videoList:{response.text}\n",
  130. )
  131. return
  132. elif len(response.json()["data"]["list"]) == 0:
  133. Common.logger(self.mode, self.platform).info(f"没有更多数据啦~\n")
  134. Common.logging(
  135. log_type=self.mode,
  136. crawler=self.platform,
  137. env=self.env,
  138. message=f"没有更多数据啦~\n"
  139. )
  140. AliyunLogger.logging(
  141. code="2000",
  142. platform=self.platform,
  143. mode=self.mode,
  144. env=self.env,
  145. message=f"没有更多数据啦~\n",
  146. )
  147. return
  148. else:
  149. next_t = response.json()["data"]["next_t"]
  150. feeds = response.json()["data"]["list"]
  151. for video_obj in feeds:
  152. try:
  153. AliyunLogger.logging(
  154. code="1001",
  155. platform=self.platform,
  156. mode=self.mode,
  157. env=self.env,
  158. message="扫描到一条视频",
  159. )
  160. Common.logging(
  161. log_type=self.mode,
  162. crawler=self.platform,
  163. env=self.env,
  164. message=f"扫描到一条视频"
  165. )
  166. date_flag = self.process_video_obj(video_obj, user_dict)
  167. if not date_flag:
  168. return
  169. except Exception as e:
  170. AliyunLogger.logging(
  171. code="3000",
  172. platform=self.platform,
  173. mode=self.mode,
  174. env=self.env,
  175. data=video_obj,
  176. message="抓取单条视频异常, 报错原因是: {}".format(e),
  177. )
  178. Common.logging(
  179. log_type=self.mode,
  180. crawler=self.platform,
  181. env=self.env,
  182. message="抓取单条视频异常, 报错原因是: {}".format(e),
  183. )
  184. def process_video_obj(self, video_obj, user_dict):
  185. trace_id = self.platform + str(uuid.uuid1())
  186. # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
  187. xiaoniangao_title = clean_title(video_obj.get("title", ""))
  188. # 随机取一个表情/符号
  189. emoji = random.choice(
  190. get_config_from_mysql(self.mode, self.platform, self.env, "emoji")
  191. )
  192. # 生成最终标题,标题list[表情+title, title+表情]随机取一个
  193. video_title = random.choice(
  194. [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]
  195. )
  196. # 发布时间
  197. publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
  198. publish_time_str = time.strftime(
  199. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  200. )
  201. # 用户名 / 头像
  202. user_name = (
  203. video_obj.get("user", {})
  204. .get("nick", "")
  205. .strip()
  206. .replace("\n", "")
  207. .replace("/", "")
  208. .replace(" ", "")
  209. .replace(" ", "")
  210. .replace("&NBSP", "")
  211. .replace("\r", "")
  212. )
  213. video_dict = {
  214. "video_title": video_title,
  215. "video_id": video_obj.get("vid", ""),
  216. "duration": int(video_obj.get("du", 0) / 1000),
  217. "play_cnt": video_obj.get("play_pv", 0),
  218. "like_cnt": video_obj.get("favor", {}).get("total", 0),
  219. "comment_cnt": video_obj.get("comment_count", 0),
  220. "share_cnt": video_obj.get("share", 0),
  221. "user_name": user_name,
  222. "publish_time_stamp": publish_time_stamp,
  223. "publish_time_str": publish_time_str,
  224. "update_time_stamp": int(time.time()),
  225. "video_width": int(video_obj.get("w", 0)),
  226. "video_height": int(video_obj.get("h", 0)),
  227. "avatar_url": video_obj.get("user", {}).get("hurl", ""),
  228. "profile_id": video_obj["id"],
  229. "profile_mid": video_obj.get("user", {}).get("mid", ""),
  230. "cover_url": video_obj.get("url", ""),
  231. "video_url": video_obj.get("v_url", ""),
  232. "session": f"xiaoniangao-author-{int(time.time())}",
  233. "out_user_id": video_obj["id"],
  234. "platform": self.platform,
  235. "strategy": self.mode,
  236. "out_video_id": video_obj.get("vid", ""),
  237. }
  238. pipeline = PiaoQuanPipeline(
  239. platform=self.platform,
  240. mode=self.mode,
  241. rule_dict=self.rule_dict,
  242. env=self.env,
  243. item=video_dict,
  244. trace_id=trace_id,
  245. )
  246. # account_level = user_dict['account_level']
  247. if user_dict['link'] in self.test_account:
  248. if (
  249. int(time.time()) - publish_time_stamp
  250. > 3600 * 24
  251. ):
  252. AliyunLogger.logging(
  253. code="2004",
  254. trace_id=trace_id,
  255. platform=self.platform,
  256. mode=self.mode,
  257. env=self.env,
  258. data=video_dict,
  259. message="发布时间超过1天"
  260. )
  261. return False
  262. flag = pipeline.repeat_video()
  263. else:
  264. if (
  265. int(time.time()) - publish_time_stamp
  266. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  267. ):
  268. AliyunLogger.logging(
  269. code="2004",
  270. trace_id=trace_id,
  271. platform=self.platform,
  272. mode=self.mode,
  273. env=self.env,
  274. data=video_dict,
  275. message="发布时间超过{}天".format(
  276. int(self.rule_dict.get("period", {}).get("max", 1000))
  277. ),
  278. )
  279. return False
  280. flag = pipeline.process_item()
  281. if flag:
  282. video_dict["width"] = video_dict["video_width"]
  283. video_dict["height"] = video_dict["video_height"]
  284. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  285. video_dict["user_id"] = user_dict["uid"]
  286. video_dict["publish_time"] = video_dict["publish_time_str"]
  287. # print(video_dict)
  288. limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
  289. if limit_flag:
  290. self.mq.send_msg(video_dict)
  291. self.download_count += 1
  292. AliyunLogger.logging(
  293. code="1002",
  294. platform=self.platform,
  295. mode=self.mode,
  296. env=self.env,
  297. data=video_dict,
  298. trace_id=trace_id,
  299. message="成功发送 MQ 至 ETL",
  300. )
  301. Common.logging(
  302. log_type=self.mode,
  303. crawler=self.platform,
  304. env=self.env,
  305. message="成功发送 MQ 至 ETL",
  306. )
  307. else:
  308. AliyunLogger.logging(
  309. code="8808",
  310. platform=self.platform,
  311. mode=self.mode,
  312. env=self.env,
  313. trace_id=trace_id,
  314. account=video_dict['user_id'],
  315. message="监测到个人账号数量超过 300,停止抓取该账号"
  316. )
  317. return True