zwwfs_recommend.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import os
  2. import json
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. sys.path.append(os.getcwd())
  9. from common.video_item import VideoItem
  10. from common import PiaoQuanPipeline, AliyunLogger, tunnel_proxies
  11. from common.mq import MQ
  12. from common.db import MysqlHelper
  13. from zhuwanwufusu.crypt import AESCipher as AES
  14. class ZhuWanWuFuSuRecommend(object):
  15. def __init__(self, platform, mode, rule_dict, user_list, env):
  16. self.platform = platform
  17. self.mode = mode
  18. self.rule_dict = rule_dict
  19. self.user_list = user_list
  20. self.env = env
  21. self.download_cnt = 0
  22. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  23. self.limit_flag = False
  24. self.cryptor = AES()
  25. def get_recommend_list(self):
  26. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2"
  27. headers = {
  28. 'Host': 'api.lidongze.cn',
  29. 'xweb_xhr': '1',
  30. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  31. 'token': '',
  32. 'content-type': 'application/json',
  33. 'accept': '*/*',
  34. 'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
  35. 'accept-language': 'en-US,en;q=0.9'
  36. }
  37. page_index = 1
  38. total_page = 2
  39. while page_index <= total_page:
  40. try:
  41. query = {
  42. "pageNo": page_index,
  43. "pageSize": 10,
  44. "groupId": "1650323161797439489", # 推荐流的 ID
  45. "vn": 1,
  46. "gx": 1,
  47. "appid": "wx0afdc2669ed8df2f",
  48. "type": 0
  49. }
  50. params = {
  51. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  52. }
  53. response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
  54. result = json.loads(self.cryptor.aes_decrypt(response.text))
  55. total_page = result['list']['pages']
  56. page_index = result['list']['current'] + 1
  57. for index, video_obj in enumerate(result['list']['records'], 1):
  58. try:
  59. AliyunLogger.logging(
  60. code="1001",
  61. platform=self.platform,
  62. mode=self.mode,
  63. env=self.env,
  64. message="扫描到一条视频",
  65. data=video_obj
  66. )
  67. self.process_video_obj(video_obj)
  68. except Exception as e:
  69. AliyunLogger.logging(
  70. code="3000",
  71. platform=self.platform,
  72. mode=self.mode,
  73. env=self.env,
  74. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
  75. )
  76. except Exception as e:
  77. AliyunLogger.logging(
  78. code="3000",
  79. platform=self.platform,
  80. mode=self.mode,
  81. env=self.env,
  82. message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  83. )
  84. time.sleep(random.randint(5, 10))
  85. def get_user_videos(self, user_id):
  86. """
  87. 在抓取完推荐页之后,去抓每一个用户的主页视频
  88. """
  89. url = "https://api.lidongze.cn/jeecg-boot/ugc/getAuthVideoList"
  90. headers = {
  91. 'Host': 'api.lidongze.cn',
  92. 'xweb_xhr': '1',
  93. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  94. 'token': '',
  95. 'content-type': 'application/json',
  96. 'accept': '*/*',
  97. 'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
  98. 'accept-language': 'en-US,en;q=0.9'
  99. }
  100. page_index = 1
  101. total_page = 1
  102. while page_index <= total_page:
  103. query = {
  104. "pageNo": page_index,
  105. "pageSize": 10,
  106. "authid": user_id
  107. }
  108. params = {
  109. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  110. }
  111. response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
  112. result = json.loads(self.cryptor.aes_decrypt(response.text))
  113. total_page = result['list']['pages']
  114. page_index = result['list']['current'] + 1
  115. for index, video_temp in enumerate(result['list']['records']):
  116. video_id = video_temp['id']
  117. detail_query = {
  118. "videoId": video_id
  119. }
  120. detail_params = {
  121. "v": self.cryptor.aes_encrypt(data=json.dumps(detail_query))
  122. }
  123. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideosDataEn"
  124. headers = {
  125. 'Host': 'api.lidongze.cn',
  126. 'xweb_xhr': '1',
  127. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  128. 'token': '',
  129. 'content-type': 'application/json',
  130. 'accept': '*/*',
  131. 'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
  132. 'accept-language': 'en-US,en;q=0.9'
  133. }
  134. detail_response = requests.request("GET", url, headers=headers, params=detail_params,
  135. proxies=tunnel_proxies())
  136. detail_video = json.loads(self.cryptor.aes_decrypt(detail_response.text))
  137. if detail_video['success']:
  138. try:
  139. AliyunLogger.logging(
  140. code="1001",
  141. platform=self.platform,
  142. mode=self.mode,
  143. env=self.env,
  144. message="扫描到一条视频",
  145. data=detail_video['data']
  146. )
  147. self.process_video_obj(detail_video['data'])
  148. except Exception as e:
  149. AliyunLogger.logging(
  150. code="3000",
  151. platform=self.platform,
  152. mode=self.mode,
  153. env=self.env,
  154. message="抓取单条视频失败, 该视频位于第{}条报错原因是{}".format(index, e)
  155. )
  156. def process_video_obj(self, video_obj):
  157. trace_id = self.platform + str(uuid.uuid1())
  158. if video_obj.get("playnum"):
  159. play_cnt = int(video_obj['playnum'].replace("万+", "0000")) if "万+" in video_obj['playnum'] else int(
  160. video_obj['playnum'])
  161. else:
  162. play_cnt = 0
  163. our_user = random.choice(self.user_list)
  164. item = VideoItem()
  165. item.add_video_info("video_id", video_obj['id'])
  166. item.add_video_info("video_title", video_obj['vname'])
  167. item.add_video_info("play_cnt", play_cnt)
  168. item.add_video_info("publish_time_stamp", int(time.time()))
  169. item.add_video_info("out_user_id", video_obj['authid'])
  170. item.add_video_info("cover_url", video_obj['shareimg'])
  171. item.add_video_info("like_cnt", int(video_obj['likenum']))
  172. item.add_video_info("video_url", video_obj['videoaddr'])
  173. item.add_video_info("out_video_id", video_obj['id'])
  174. item.add_video_info("platform", self.platform)
  175. item.add_video_info("strategy", self.mode)
  176. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  177. item.add_video_info("user_id", our_user['uid'])
  178. item.add_video_info("user_name", our_user['nick_name'])
  179. # 把扫描到的账号存到 accounts 表中
  180. self.manage_auth_id(out_user_id=video_obj['authid'], out_user_name=video_obj['authname'])
  181. mq_obj = item.produce_item()
  182. pipeline = PiaoQuanPipeline(
  183. platform=self.platform,
  184. mode=self.mode,
  185. rule_dict=self.rule_dict,
  186. env=self.env,
  187. item=mq_obj,
  188. trace_id=trace_id,
  189. )
  190. if pipeline.process_item():
  191. self.download_cnt += 1
  192. self.mq.send_msg(mq_obj)
  193. AliyunLogger.logging(
  194. code="1002",
  195. platform=self.platform,
  196. mode=self.mode,
  197. env=self.env,
  198. message="成功发送至 ETL",
  199. data=mq_obj
  200. )
  201. if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 200)):
  202. self.limit_flag = True
  203. def manage_auth_id(self, out_user_id, out_user_name):
  204. """
  205. out_user_id: 外站视频的用户 id
  206. out_user_name: 外站视频用户名字
  207. 逻辑: 对新扫描到的视频的用户 id 进行判断,若用户 id 不存在,则把视频 id 存到表中,
  208. 如果用户 id 存在,则判断用户是否修改名字,若名字修改则更新名字
  209. """
  210. select_user_sql = f"""select name, name_id from accounts where name_id = "{out_user_id}" and platform = "{self.platform}" and useful = 1 limit 1"""
  211. out_user_info = MysqlHelper.get_values(
  212. log_type=self.mode,
  213. crawler=self.platform,
  214. sql=select_user_sql,
  215. env=self.env,
  216. machine="",
  217. )
  218. if out_user_info:
  219. name, name_id = out_user_info[0]
  220. if name == out_user_name:
  221. return
  222. else:
  223. update_sql = f"""update accounts set name = "{out_user_name}" where name_id = "{out_user_id}";"""
  224. MysqlHelper.update_values(
  225. log_type=self.mode,
  226. crawler=self.platform,
  227. sql=update_sql,
  228. env=self.env,
  229. machine=""
  230. )
  231. else:
  232. insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{out_user_name}", "{out_user_id}", "{self.platform}", 1 )"""
  233. MysqlHelper.update_values(
  234. log_type=self.mode,
  235. crawler=self.platform,
  236. sql=insert_sql,
  237. env=self.env,
  238. machine="",
  239. )
  240. def get_user_list(self):
  241. select_user_sql = f"""select name_id from accounts where platform = "{self.platform}" and useful = 1"""
  242. out_user_info = MysqlHelper.get_values(
  243. log_type=self.mode,
  244. crawler=self.platform,
  245. sql=select_user_sql,
  246. env=self.env,
  247. machine="",
  248. )
  249. if out_user_info:
  250. result = []
  251. for i in out_user_info:
  252. result.append(i[0])
  253. return result
  254. else:
  255. return []
  256. def get_detail_video_list(self):
  257. url = "https://api.lidongze.cn/jeecg-boot/ugc/getDetailVideoListsEn2"
  258. headers = {
  259. 'Host': 'api.lidongze.cn',
  260. 'xweb_xhr': '1',
  261. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
  262. 'token': '',
  263. 'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
  264. 'accept-language': 'en-US,en;q=0.9'
  265. }
  266. page_index = 1
  267. total_page = 2
  268. while page_index <= total_page:
  269. try:
  270. query = {
  271. "groupId": "1650323161797439489",
  272. "pageNo": page_index,
  273. "pageSize": 10,
  274. # "videoId": "1681138763919003650",
  275. "appid": "wx0afdc2669ed8df2f",
  276. "type": 3,
  277. "hxid": "1556555457243828666"
  278. }
  279. params = {
  280. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  281. }
  282. response = requests.request("GET", url, headers=headers, params=params)
  283. result = json.loads(self.cryptor.aes_decrypt(response.text))
  284. total_page = result['list']['pages']
  285. page_index = result['list']['current'] + 1
  286. for index, video_obj in enumerate(result['list']['records'], 1):
  287. try:
  288. AliyunLogger.logging(
  289. code="1001",
  290. platform=self.platform,
  291. mode=self.mode,
  292. env=self.env,
  293. message="扫描到一条视频",
  294. data=video_obj
  295. )
  296. self.process_video_obj(video_obj)
  297. except Exception as e:
  298. AliyunLogger.logging(
  299. code="3000",
  300. platform=self.platform,
  301. mode=self.mode,
  302. env=self.env,
  303. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
  304. )
  305. except Exception as e:
  306. AliyunLogger.logging(
  307. code="3000",
  308. platform=self.platform,
  309. mode=self.mode,
  310. env=self.env,
  311. message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  312. )
  313. time.sleep(random.randint(5, 10))
  314. def schedule(self):
  315. """
  316. 先抓取推荐列表的视频, 等待 2 分钟后抓取 detail 页面,等待 5 分钟后,抓取账号视频
  317. """
  318. self.get_recommend_list()
  319. time.sleep(2 * 60)
  320. self.get_detail_video_list()
  321. time.sleep(5 * 60)
  322. self.mode = "author"
  323. user_list = self.get_user_list()
  324. if user_list:
  325. for index, user_id in enumerate(user_list):
  326. try:
  327. if self.limit_flag:
  328. AliyunLogger.logging(
  329. code="2000",
  330. platform=self.platform,
  331. mode=self.mode,
  332. env=self.env,
  333. message="本轮已经抓取足够数量的视频"
  334. )
  335. return
  336. self.get_user_videos(user_id=user_id)
  337. except Exception as e:
  338. AliyunLogger.logging(
  339. code="3000",
  340. platform=self.platform,
  341. mode=self.mode,
  342. env=self.env,
  343. message="抓取账号视频出现异常,账号 id 是{}, 报错原因是{}".format(user_id, e)
  344. )
  345. if __name__ == '__main__':
  346. pass