xiaoniangao_author_v2.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. import json
  2. import os
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. from common.mq import MQ
  9. sys.path.append(os.getcwd())
  10. from common.redis_db import get_data, store_data
  11. from common.common import Common
  12. from common import AliyunLogger, PiaoQuanPipeline
  13. from common.public import get_config_from_mysql, clean_title
  14. from common.limit import AuthorLimit
  15. def tunnel_proxies():
  16. # 隧道域名:端口号
  17. tunnel = "q796.kdltps.com:15818"
  18. # 用户名密码方式
  19. username = "t17772369458618"
  20. password = "5zqcjkmy"
  21. tunnel_proxies = {
  22. "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
  23. % {"user": username, "pwd": password, "proxy": tunnel},
  24. "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
  25. % {"user": username, "pwd": password, "proxy": tunnel},
  26. }
  27. return tunnel_proxies
  28. class XiaoNianGaoAuthor:
  29. def __init__(self, platform, mode, rule_dict, env, user_list):
  30. self.platform = platform
  31. self.mode = mode
  32. self.rule_dict = rule_dict
  33. self.env = env
  34. self.user_list = user_list
  35. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  36. self.download_count = 0
  37. self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323,
  38. 58528071, 58527278]
  39. self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
  40. def get_author_list(self):
  41. # 每轮只抓取定量的数据,到达数量后自己退出
  42. max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 140))
  43. for user_dict in self.user_list:
  44. if self.download_count <= max_count:
  45. self.get_video_list(user_dict)
  46. # time.sleep(random.randint(1, 10))
  47. time.sleep(1)
  48. else:
  49. AliyunLogger.logging(
  50. code="2000",
  51. platform=self.platform,
  52. mode=self.mode,
  53. env=self.env,
  54. message="本轮已经抓取足够数量的视频,已经自动退出",
  55. )
  56. Common.logging(
  57. log_type=self.mode,
  58. crawler=self.platform,
  59. env=self.env,
  60. message="本轮已经抓取足够数量的视频,已经自动退出",
  61. )
  62. return
  63. def get_video_list(self, user_dict):
  64. next_t = -1
  65. # 只抓取更新的视频,如果刷到已经更新的立即退出
  66. url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
  67. headers = {
  68. 'Host': 'kapi-xng-app.xiaoniangao.cn',
  69. 'content-type': 'application/json; charset=utf-8',
  70. 'accept': '*/*',
  71. 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=',
  72. 'verb': 'POST',
  73. 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083',
  74. 'x-b3-traceid': '2f9da41f960ae077',
  75. 'accept-language': 'zh-cn',
  76. 'date': 'Mon, 19 Jun 2023 06:41:17 GMT',
  77. 'x-token-id': '',
  78. 'x-signaturemethod': 'hmac-sha1',
  79. 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0'
  80. }
  81. while True:
  82. payload = {
  83. "token": "",
  84. "limit": 20,
  85. "start_t": next_t,
  86. "visited_mid": int(user_dict["link"]),
  87. "share_width": 300,
  88. "share_height": 240,
  89. }
  90. response = requests.request(
  91. "POST",
  92. url,
  93. headers=headers,
  94. data=json.dumps(payload),
  95. # proxies=tunnel_proxies(),
  96. )
  97. if "data" not in response.text or response.status_code != 200:
  98. Common.logger(self.mode, self.platform).info(
  99. f"get_videoList:{response.text}\n"
  100. )
  101. Common.logging(
  102. log_type=self.mode,
  103. crawler=self.platform,
  104. env=self.env,
  105. message=f"get_videoList:{response.text}\n"
  106. )
  107. AliyunLogger.logging(
  108. code="2000",
  109. platform=self.platform,
  110. mode=self.mode,
  111. env=self.env,
  112. message=f"get_videoList:{response.text}\n",
  113. )
  114. return
  115. elif "list" not in response.json()["data"]:
  116. Common.logger(self.mode, self.platform).info(
  117. f"get_videoList:{response.json()}\n"
  118. )
  119. Common.logging(
  120. log_type=self.mode,
  121. crawler=self.platform,
  122. env=self.env,
  123. message=f"get_videoList:{response.json()}\n"
  124. )
  125. AliyunLogger.logging(
  126. code="2000",
  127. platform=self.platform,
  128. mode=self.mode,
  129. env=self.env,
  130. message=f"get_videoList:{response.text}\n",
  131. )
  132. return
  133. elif len(response.json()["data"]["list"]) == 0:
  134. Common.logger(self.mode, self.platform).info(f"没有更多数据啦~\n")
  135. Common.logging(
  136. log_type=self.mode,
  137. crawler=self.platform,
  138. env=self.env,
  139. message=f"没有更多数据啦~\n"
  140. )
  141. AliyunLogger.logging(
  142. code="2000",
  143. platform=self.platform,
  144. mode=self.mode,
  145. env=self.env,
  146. message=f"没有更多数据啦~\n",
  147. )
  148. return
  149. else:
  150. next_t = response.json()["data"]["next_t"]
  151. feeds = response.json()["data"]["list"]
  152. for video_obj in feeds:
  153. try:
  154. AliyunLogger.logging(
  155. code="1001",
  156. platform=self.platform,
  157. mode=self.mode,
  158. env=self.env,
  159. message="扫描到一条视频",
  160. )
  161. Common.logging(
  162. log_type=self.mode,
  163. crawler=self.platform,
  164. env=self.env,
  165. message=f"扫描到一条视频"
  166. )
  167. date_flag = self.process_video_obj(video_obj, user_dict)
  168. if not date_flag:
  169. return
  170. except Exception as e:
  171. AliyunLogger.logging(
  172. code="3000",
  173. platform=self.platform,
  174. mode=self.mode,
  175. env=self.env,
  176. data=video_obj,
  177. message="抓取单条视频异常, 报错原因是: {}".format(e),
  178. )
  179. Common.logging(
  180. log_type=self.mode,
  181. crawler=self.platform,
  182. env=self.env,
  183. message="抓取单条视频异常, 报错原因是: {}".format(e),
  184. )
  185. def process_video_obj(self, video_obj, user_dict):
  186. trace_id = self.platform + str(uuid.uuid1())
  187. # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
  188. xiaoniangao_title = clean_title(video_obj.get("title", ""))
  189. # 随机取一个表情/符号
  190. emoji = random.choice(
  191. get_config_from_mysql(self.mode, self.platform, self.env, "emoji")
  192. )
  193. # 生成最终标题,标题list[表情+title, title+表情]随机取一个
  194. video_title = random.choice(
  195. [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]
  196. )
  197. # 发布时间
  198. publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
  199. publish_time_str = time.strftime(
  200. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  201. )
  202. # 用户名 / 头像
  203. user_name = (
  204. video_obj.get("user", {})
  205. .get("nick", "")
  206. .strip()
  207. .replace("\n", "")
  208. .replace("/", "")
  209. .replace(" ", "")
  210. .replace(" ", "")
  211. .replace("&NBSP", "")
  212. .replace("\r", "")
  213. )
  214. video_dict = {
  215. "video_title": video_title,
  216. "video_id": video_obj.get("vid", ""),
  217. "duration": int(video_obj.get("du", 0) / 1000),
  218. "play_cnt": video_obj.get("play_pv", 0),
  219. "like_cnt": video_obj.get("favor", {}).get("total", 0),
  220. "comment_cnt": video_obj.get("comment_count", 0),
  221. "share_cnt": video_obj.get("share", 0),
  222. "user_name": user_name,
  223. "publish_time_stamp": publish_time_stamp,
  224. "publish_time_str": publish_time_str,
  225. "update_time_stamp": int(time.time()),
  226. "video_width": int(video_obj.get("w", 0)),
  227. "video_height": int(video_obj.get("h", 0)),
  228. "avatar_url": video_obj.get("user", {}).get("hurl", ""),
  229. "profile_id": video_obj["id"],
  230. "profile_mid": video_obj.get("user", {}).get("mid", ""),
  231. "cover_url": video_obj.get("url", ""),
  232. "video_url": video_obj.get("v_url", ""),
  233. "session": f"xiaoniangao-author-{int(time.time())}",
  234. "out_user_id": video_obj["id"],
  235. "platform": self.platform,
  236. "strategy": self.mode,
  237. "out_video_id": video_obj.get("vid", ""),
  238. }
  239. value = get_data(self.platform, video_obj.get("vid", ""))
  240. if value:
  241. AliyunLogger.logging(
  242. code="2004",
  243. trace_id=trace_id,
  244. platform=self.platform,
  245. mode=self.mode,
  246. env=self.env,
  247. data=video_dict,
  248. message="redis重复视频{}".format(
  249. video_dict))
  250. pipeline = PiaoQuanPipeline(
  251. platform=self.platform,
  252. mode=self.mode,
  253. rule_dict=self.rule_dict,
  254. env=self.env,
  255. item=video_dict,
  256. trace_id=trace_id,
  257. )
  258. # account_level = user_dict['account_level']
  259. if user_dict['link'] in self.test_account:
  260. if (
  261. int(time.time()) - publish_time_stamp
  262. > 3600 * 24
  263. ):
  264. AliyunLogger.logging(
  265. code="2004",
  266. trace_id=trace_id,
  267. platform=self.platform,
  268. mode=self.mode,
  269. env=self.env,
  270. data=video_dict,
  271. message="发布时间超过1天"
  272. )
  273. return False
  274. flag = pipeline.repeat_video()
  275. else:
  276. if (
  277. int(time.time()) - publish_time_stamp
  278. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  279. ):
  280. AliyunLogger.logging(
  281. code="2004",
  282. trace_id=trace_id,
  283. platform=self.platform,
  284. mode=self.mode,
  285. env=self.env,
  286. data=video_dict,
  287. message="发布时间超过{}天".format(
  288. int(self.rule_dict.get("period", {}).get("max", 1000))
  289. ),
  290. )
  291. return False
  292. flag = pipeline.process_item()
  293. if flag:
  294. video_dict["width"] = video_dict["video_width"]
  295. video_dict["height"] = video_dict["video_height"]
  296. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  297. video_dict["user_id"] = user_dict["uid"]
  298. video_dict["publish_time"] = video_dict["publish_time_str"]
  299. # print(video_dict)
  300. limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
  301. if limit_flag:
  302. self.mq.send_msg(video_dict)
  303. store_data(self.platform, video_obj.get("vid", ""))
  304. Common.logging(
  305. log_type=self.mode,
  306. crawler=self.platform,
  307. env=self.env,
  308. message="写入 redis 成功",
  309. )
  310. self.download_count += 1
  311. AliyunLogger.logging(
  312. code="1002",
  313. platform=self.platform,
  314. mode=self.mode,
  315. env=self.env,
  316. data=video_dict,
  317. trace_id=trace_id,
  318. message="成功发送 MQ 至 ETL",
  319. )
  320. Common.logging(
  321. log_type=self.mode,
  322. crawler=self.platform,
  323. env=self.env,
  324. message="成功发送 MQ 至 ETL",
  325. )
  326. else:
  327. AliyunLogger.logging(
  328. code="8808",
  329. platform=self.platform,
  330. mode=self.mode,
  331. env=self.env,
  332. trace_id=trace_id,
  333. account=video_dict['user_id'],
  334. message="监测到个人账号数量超过 300,停止抓取该账号"
  335. )
  336. return True