kaixinkuailechangxiangban_recommend_2.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import os
  2. import json
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. sys.path.append(os.getcwd())
  9. from common.video_item import VideoItem
  10. from common import PiaoQuanPipeline, AliyunLogger, tunnel_proxies
  11. from common.mq import MQ
  12. from common.db import MysqlHelper
  13. from zhuwanwufusu.crypt import AESCipher as AES
  14. class KXKLCXBRecommend(object):
  15. def __init__(self, platform, mode, rule_dict, user_list, env):
  16. self.platform = platform
  17. self.mode = mode
  18. self.rule_dict = rule_dict
  19. self.user_list = user_list
  20. self.env = env
  21. self.download_cnt = 0
  22. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  23. self.limit_flag = False
  24. self.cryptor = AES()
  25. def get_recommend_list(self):
  26. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2"
  27. headers = {
  28. 'Host': 'api.lidongze.cn',
  29. 'xweb_xhr': '1',
  30. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  31. 'token': '',
  32. 'content-type': 'application/json',
  33. 'accept': '*/*',
  34. 'referer': 'https://servicewechat.com/wx05c1e3981d52164e/2/page-frame.html',
  35. 'accept-language': 'en-US,en;q=0.9'
  36. }
  37. page_index = 1
  38. total_page = 2
  39. while page_index <= total_page:
  40. try:
  41. query = {
  42. "pageNo": page_index,
  43. "pageSize": 10,
  44. "groupId": "1650323161797439489", # 推荐流的 ID
  45. "vn": 1,
  46. "gx": 1,
  47. "appid": "wx05c1e3981d52164e",
  48. "type": 2
  49. }
  50. params = {
  51. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  52. }
  53. response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
  54. result = json.loads(self.cryptor.aes_decrypt(response.text))
  55. total_page = result['list']['pages']
  56. page_index = result['list']['current'] + 1
  57. for index, video_obj in enumerate(result['list']['records'], 1):
  58. try:
  59. AliyunLogger.logging(
  60. code="1001",
  61. platform=self.platform,
  62. mode=self.mode,
  63. env=self.env,
  64. message="扫描到一条视频",
  65. data=video_obj
  66. )
  67. self.process_video_obj(video_obj)
  68. except Exception as e:
  69. AliyunLogger.logging(
  70. code="3000",
  71. platform=self.platform,
  72. mode=self.mode,
  73. env=self.env,
  74. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
  75. )
  76. except Exception as e:
  77. AliyunLogger.logging(
  78. code="3000",
  79. platform=self.platform,
  80. mode=self.mode,
  81. env=self.env,
  82. message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  83. )
  84. time.sleep(random.randint(5, 10))
  85. def get_user_videos(self, user_id):
  86. """
  87. 在抓取完推荐页之后,去抓每一个用户的主页视频
  88. """
  89. url = "https://api.lidongze.cn/jeecg-boot/ugc/getAuthVideoList"
  90. headers = {
  91. 'Host': 'api.lidongze.cn',
  92. 'xweb_xhr': '1',
  93. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  94. 'token': '',
  95. 'content-type': 'application/json',
  96. 'accept': '*/*',
  97. 'referer': 'https://servicewechat.com/wx05c1e3981d52164e/2/page-frame.html',
  98. 'accept-language': 'en-US,en;q=0.9'
  99. }
  100. page_index = 1
  101. total_page = 1
  102. while page_index <= total_page:
  103. query = {
  104. "pageNo": page_index,
  105. "pageSize": 10,
  106. "authid": user_id
  107. }
  108. params = {
  109. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  110. }
  111. response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
  112. result = json.loads(self.cryptor.aes_decrypt(response.text))
  113. total_page = result['list']['pages']
  114. page_index = result['list']['current'] + 1
  115. for index, video_temp in enumerate(result['list']['records']):
  116. video_id = video_temp['id']
  117. detail_query = {
  118. "videoId": video_id
  119. }
  120. detail_params = {
  121. "v": self.cryptor.aes_encrypt(data=json.dumps(detail_query))
  122. }
  123. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideosDataEn"
  124. headers = {
  125. 'Host': 'api.lidongze.cn',
  126. 'xweb_xhr': '1',
  127. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  128. 'token': '',
  129. 'content-type': 'application/json',
  130. 'accept': '*/*',
  131. 'referer': 'https://servicewechat.com/wx05c1e3981d52164e/2/page-frame.html',
  132. 'accept-language': 'en-US,en;q=0.9'
  133. }
  134. detail_response = requests.request("GET", url, headers=headers, params=detail_params,
  135. proxies=tunnel_proxies())
  136. detail_video = json.loads(self.cryptor.aes_decrypt(detail_response.text))
  137. if detail_video['success']:
  138. try:
  139. AliyunLogger.logging(
  140. code="1001",
  141. platform=self.platform,
  142. mode=self.mode,
  143. env=self.env,
  144. message="扫描到一条视频",
  145. data=detail_video['data']
  146. )
  147. self.process_video_obj(detail_video['data'])
  148. except Exception as e:
  149. AliyunLogger.logging(
  150. code="3000",
  151. platform=self.platform,
  152. mode=self.mode,
  153. env=self.env,
  154. message="抓取单条视频失败, 该视频位于第{}条报错原因是{}".format(index, e)
  155. )
  156. def process_video_obj(self, video_obj):
  157. time.sleep(random.randint(3, 8))
  158. trace_id = self.platform + str(uuid.uuid1())
  159. if video_obj.get("playnum"):
  160. play_cnt = int(video_obj['playnum'].replace("万+", "0000")) if "万+" in video_obj['playnum'] else int(
  161. video_obj['playnum'])
  162. else:
  163. play_cnt = 0
  164. our_user = random.choice(self.user_list)
  165. item = VideoItem()
  166. item.add_video_info("video_id", video_obj['id'])
  167. item.add_video_info("video_title", video_obj['vname'])
  168. item.add_video_info("play_cnt", play_cnt)
  169. item.add_video_info("publish_time_stamp", int(time.time()))
  170. item.add_video_info("out_user_id", video_obj['authid'])
  171. item.add_video_info("cover_url", video_obj['shareimg'])
  172. item.add_video_info("like_cnt", int(video_obj['likenum']))
  173. item.add_video_info("video_url", video_obj['videoaddr'])
  174. item.add_video_info("out_video_id", video_obj['id'])
  175. item.add_video_info("platform", self.platform)
  176. item.add_video_info("strategy", self.mode)
  177. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  178. item.add_video_info("user_id", our_user['uid'])
  179. item.add_video_info("user_name", our_user['nick_name'])
  180. # 把扫描到的账号存到 accounts 表中
  181. self.manage_auth_id(out_user_id=video_obj['authid'], out_user_name=video_obj['authname'])
  182. mq_obj = item.produce_item()
  183. pipeline = PiaoQuanPipeline(
  184. platform=self.platform,
  185. mode=self.mode,
  186. rule_dict=self.rule_dict,
  187. env=self.env,
  188. item=mq_obj,
  189. trace_id=trace_id,
  190. )
  191. if pipeline.process_item():
  192. self.download_cnt += 1
  193. self.mq.send_msg(mq_obj)
  194. AliyunLogger.logging(
  195. code="1002",
  196. platform=self.platform,
  197. mode=self.mode,
  198. env=self.env,
  199. message="成功发送至 ETL",
  200. data=mq_obj
  201. )
  202. if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 200)):
  203. self.limit_flag = True
  204. def manage_auth_id(self, out_user_id, out_user_name):
  205. """
  206. out_user_id: 外站视频的用户 id
  207. out_user_name: 外站视频用户名字
  208. 逻辑: 对新扫描到的视频的用户 id 进行判断,若用户 id 不存在,则把视频 id 存到表中,
  209. 如果用户 id 存在,则判断用户是否修改名字,若名字修改则更新名字
  210. """
  211. select_user_sql = f"""select name, name_id from accounts where name_id = "{out_user_id}" and platform = "{self.platform}" and useful = 1 limit 1"""
  212. out_user_info = MysqlHelper.get_values(
  213. log_type=self.mode,
  214. crawler=self.platform,
  215. sql=select_user_sql,
  216. env=self.env,
  217. machine="",
  218. )
  219. if out_user_info:
  220. name, name_id = out_user_info[0]
  221. if name == out_user_name:
  222. return
  223. else:
  224. update_sql = f"""update accounts set name = "{out_user_name}" where name_id = "{out_user_id}";"""
  225. MysqlHelper.update_values(
  226. log_type=self.mode,
  227. crawler=self.platform,
  228. sql=update_sql,
  229. env=self.env,
  230. machine=""
  231. )
  232. else:
  233. insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{out_user_name}", "{out_user_id}", "{self.platform}", 1 )"""
  234. MysqlHelper.update_values(
  235. log_type=self.mode,
  236. crawler=self.platform,
  237. sql=insert_sql,
  238. env=self.env,
  239. machine="",
  240. )
  241. def get_user_list(self):
  242. select_user_sql = f"""select name_id from accounts where platform = "{self.platform}" and useful = 1"""
  243. out_user_info = MysqlHelper.get_values(
  244. log_type=self.mode,
  245. crawler=self.platform,
  246. sql=select_user_sql,
  247. env=self.env,
  248. machine="",
  249. )
  250. if out_user_info:
  251. result = []
  252. for i in out_user_info:
  253. result.append(i[0])
  254. return result
  255. else:
  256. return []
  257. def get_detail_video_list(self):
  258. url = "https://api.lidongze.cn/jeecg-boot/ugc/getDetailVideoListsEn2"
  259. headers = {
  260. 'Host': 'api.lidongze.cn',
  261. 'xweb_xhr': '1',
  262. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
  263. 'token': '',
  264. 'referer': 'https://servicewechat.com/wx05c1e3981d52164e/2/page-frame.html',
  265. 'accept-language': 'en-US,en;q=0.9'
  266. }
  267. page_index = 1
  268. total_page = 2
  269. while page_index <= total_page:
  270. try:
  271. if self.limit_flag:
  272. AliyunLogger.logging(
  273. code="2000",
  274. platform=self.platform,
  275. mode=self.mode,
  276. env=self.env,
  277. message="本轮已经抓取足够数量的视频"
  278. )
  279. return
  280. else:
  281. query = {
  282. "groupId": "1650323161797439489",
  283. "pageNo": page_index,
  284. "pageSize": 10,
  285. "appid": "wx05c1e3981d52164e",
  286. "type": 3,
  287. "hxid": "1556555457243828666"
  288. }
  289. params = {
  290. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  291. }
  292. response = requests.request("GET", url, headers=headers, params=params)
  293. result = json.loads(self.cryptor.aes_decrypt(response.text))
  294. total_page = result['list']['pages']
  295. page_index = result['list']['current'] + 1
  296. for index, video_obj in enumerate(result['list']['records'], 1):
  297. try:
  298. AliyunLogger.logging(
  299. code="1001",
  300. platform=self.platform,
  301. mode=self.mode,
  302. env=self.env,
  303. message="扫描到一条视频",
  304. data=video_obj
  305. )
  306. self.process_video_obj(video_obj)
  307. except Exception as e:
  308. AliyunLogger.logging(
  309. code="3000",
  310. platform=self.platform,
  311. mode=self.mode,
  312. env=self.env,
  313. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
  314. )
  315. except Exception as e:
  316. AliyunLogger.logging(
  317. code="3000",
  318. platform=self.platform,
  319. mode=self.mode,
  320. env=self.env,
  321. message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  322. )
  323. time.sleep(random.randint(5, 10))
  324. def schedule(self):
  325. """
  326. 先抓取推荐列表的视频, 等待 2 分钟后抓取 detail 页面,等待 5 分钟后,抓取账号视频
  327. """
  328. self.get_recommend_list()
  329. if self.limit_flag:
  330. return
  331. time.sleep(2 * 60)
  332. self.get_detail_video_list()
  333. if self.limit_flag:
  334. return
  335. time.sleep(5 * 60)
  336. self.mode = "author"
  337. user_list = self.get_user_list()
  338. if user_list:
  339. for index, user_id in enumerate(user_list):
  340. try:
  341. if self.limit_flag:
  342. AliyunLogger.logging(
  343. code="2000",
  344. platform=self.platform,
  345. mode=self.mode,
  346. env=self.env,
  347. message="本轮已经抓取足够数量的视频"
  348. )
  349. return
  350. self.get_user_videos(user_id=user_id)
  351. except Exception as e:
  352. AliyunLogger.logging(
  353. code="3000",
  354. platform=self.platform,
  355. mode=self.mode,
  356. env=self.env,
  357. message="抓取账号视频出现异常,账号 id 是{}, 报错原因是{}".format(user_id, e)
  358. )
  359. if __name__ == '__main__':
  360. pass