zjsjm_recommend.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. import os
  2. import json
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. sys.path.append(os.getcwd())
  9. from common.video_item import VideoItem
  10. from common import PiaoQuanPipeline, AliyunLogger, tunnel_proxies
  11. from common.mq import MQ
  12. from common.db import MysqlHelper
  13. from zhuwanwufusu.crypt import AESCipher as AES
  14. class ZhuJinShanJinMeiRecommend(object):
  15. def __init__(self, platform, mode, rule_dict, user_list, env):
  16. self.platform = platform
  17. self.mode = mode
  18. self.rule_dict = rule_dict
  19. self.user_list = user_list
  20. self.env = env
  21. self.download_cnt = 0
  22. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  23. self.limit_flag = False
  24. self.cryptor = AES()
  25. def get_recommend_list(self):
  26. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2"
  27. headers = {
  28. 'Host': 'api.lidongze.cn',
  29. 'xweb_xhr': '1',
  30. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  31. 'token': '',
  32. 'content-type': 'application/json',
  33. 'accept': '*/*',
  34. 'referer': 'https://servicewechat.com/wxbf44c7b26b0b3280/4/page-frame.html',
  35. 'accept-language': 'en-US,en;q=0.9'
  36. }
  37. page_index = 1
  38. total_page = 2
  39. while page_index <= total_page:
  40. try:
  41. query = {
  42. "pageNo": page_index,
  43. "pageSize": 10,
  44. "groupId": "1650323161797439489", # 推荐流的 ID
  45. "vn": 1,
  46. "gx": 1,
  47. "appid": "wxbf44c7b26b0b3280",
  48. "type": 2
  49. }
  50. # 1650323161797439489
  51. params = {
  52. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  53. }
  54. response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
  55. result = json.loads(self.cryptor.aes_decrypt(response.text))
  56. total_page = result['list']['pages']
  57. page_index = result['list']['current'] + 1
  58. for index, video_obj in enumerate(result['list']['records'], 1):
  59. try:
  60. AliyunLogger.logging(
  61. code="1001",
  62. platform=self.platform,
  63. mode=self.mode,
  64. env=self.env,
  65. message="扫描到一条视频",
  66. data=video_obj
  67. )
  68. self.process_video_obj(video_obj)
  69. except Exception as e:
  70. AliyunLogger.logging(
  71. code="3000",
  72. platform=self.platform,
  73. mode=self.mode,
  74. env=self.env,
  75. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index, e)
  76. )
  77. except Exception as e:
  78. AliyunLogger.logging(
  79. code="3000",
  80. platform=self.platform,
  81. mode=self.mode,
  82. env=self.env,
  83. message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  84. )
  85. time.sleep(random.randint(5, 10))
  86. def get_user_videos(self, user_id):
  87. """
  88. 在抓取完推荐页之后,去抓每一个用户的主页视频
  89. """
  90. url = "https://api.lidongze.cn/jeecg-boot/ugc/getAuthVideoList"
  91. headers = {
  92. 'Host': 'api.lidongze.cn',
  93. 'xweb_xhr': '1',
  94. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  95. 'token': '',
  96. 'content-type': 'application/json',
  97. 'accept': '*/*',
  98. 'referer': 'https://servicewechat.com/wx0afdc2669ed8df2f/3/page-frame.html',
  99. 'accept-language': 'en-US,en;q=0.9'
  100. }
  101. page_index = 1
  102. total_page = 1
  103. while page_index <= total_page:
  104. query = {
  105. "pageNo": page_index,
  106. "pageSize": 10,
  107. "authid": user_id
  108. }
  109. params = {
  110. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  111. }
  112. response = requests.request("GET", url, headers=headers, params=params, proxies=tunnel_proxies())
  113. result = json.loads(self.cryptor.aes_decrypt(response.text))
  114. total_page = result['list']['pages']
  115. page_index = result['list']['current'] + 1
  116. for index, video_temp in enumerate(result['list']['records']):
  117. video_id = video_temp['id']
  118. detail_query = {
  119. "videoId": video_id
  120. }
  121. detail_params = {
  122. "v": self.cryptor.aes_encrypt(data=json.dumps(detail_query))
  123. }
  124. url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideosDataEn"
  125. headers = {
  126. 'Host': 'api.lidongze.cn',
  127. 'xweb_xhr': '1',
  128. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  129. 'token': '',
  130. 'content-type': 'application/json',
  131. 'accept': '*/*',
  132. 'referer': 'https://servicewechat.com/wxbf44c7b26b0b3280/4/page-frame.html',
  133. 'accept-language': 'en-US,en;q=0.9'
  134. }
  135. detail_response = requests.request("GET", url, headers=headers, params=detail_params,
  136. proxies=tunnel_proxies())
  137. detail_video = json.loads(self.cryptor.aes_decrypt(detail_response.text))
  138. if detail_video['success']:
  139. try:
  140. AliyunLogger.logging(
  141. code="1001",
  142. platform=self.platform,
  143. mode=self.mode,
  144. env=self.env,
  145. message="扫描到一条视频",
  146. data=detail_video['data']
  147. )
  148. self.process_video_obj(detail_video['data'])
  149. except Exception as e:
  150. AliyunLogger.logging(
  151. code="3000",
  152. platform=self.platform,
  153. mode=self.mode,
  154. env=self.env,
  155. message="抓取单条视频失败, 该视频位于第{}条报错原因是{}".format(index, e)
  156. )
  157. def process_video_obj(self, video_obj):
  158. time.sleep(random.randint(3, 8))
  159. trace_id = self.platform + str(uuid.uuid1())
  160. if video_obj.get("playnum"):
  161. play_cnt = int(video_obj['playnum'].replace("万+", "0000")) if "万+" in video_obj['playnum'] else int(
  162. video_obj['playnum'])
  163. else:
  164. play_cnt = 0
  165. our_user = random.choice(self.user_list)
  166. item = VideoItem()
  167. item.add_video_info("video_id", video_obj['id'])
  168. item.add_video_info("video_title", video_obj['vname'])
  169. item.add_video_info("play_cnt", play_cnt)
  170. item.add_video_info("publish_time_stamp", int(time.time()))
  171. item.add_video_info("out_user_id", video_obj['authid'])
  172. item.add_video_info("cover_url", video_obj['shareimg'])
  173. item.add_video_info("like_cnt", int(video_obj['likenum']))
  174. item.add_video_info("video_url", video_obj['videoaddr'])
  175. item.add_video_info("out_video_id", video_obj['id'])
  176. item.add_video_info("platform", self.platform)
  177. item.add_video_info("strategy", self.mode)
  178. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  179. item.add_video_info("user_id", our_user['uid'])
  180. item.add_video_info("user_name", our_user['nick_name'])
  181. # 把扫描到的账号存到 accounts 表中
  182. self.manage_auth_id(out_user_id=video_obj['authid'], out_user_name=video_obj['authname'])
  183. mq_obj = item.produce_item()
  184. pipeline = PiaoQuanPipeline(
  185. platform=self.platform,
  186. mode=self.mode,
  187. rule_dict=self.rule_dict,
  188. env=self.env,
  189. item=mq_obj,
  190. trace_id=trace_id,
  191. )
  192. if pipeline.process_item():
  193. self.download_cnt += 1
  194. self.mq.send_msg(mq_obj)
  195. AliyunLogger.logging(
  196. code="1002",
  197. platform=self.platform,
  198. mode=self.mode,
  199. env=self.env,
  200. message="成功发送至 ETL",
  201. data=mq_obj
  202. )
  203. if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 200)):
  204. self.limit_flag = True
  205. def manage_auth_id(self, out_user_id, out_user_name):
  206. """
  207. out_user_id: 外站视频的用户 id
  208. out_user_name: 外站视频用户名字
  209. 逻辑: 对新扫描到的视频的用户 id 进行判断,若用户 id 不存在,则把视频 id 存到表中,
  210. 如果用户 id 存在,则判断用户是否修改名字,若名字修改则更新名字
  211. """
  212. select_user_sql = f"""select name, name_id from accounts where name_id = "{out_user_id}" and platform = "{self.platform}" and useful = 1 limit 1"""
  213. out_user_info = MysqlHelper.get_values(
  214. log_type=self.mode,
  215. crawler=self.platform,
  216. sql=select_user_sql,
  217. env=self.env,
  218. machine="",
  219. )
  220. if out_user_info:
  221. name, name_id = out_user_info[0]
  222. if name == out_user_name:
  223. return
  224. else:
  225. update_sql = f"""update accounts set name = "{out_user_name}" where name_id = "{out_user_id}";"""
  226. MysqlHelper.update_values(
  227. log_type=self.mode,
  228. crawler=self.platform,
  229. sql=update_sql,
  230. env=self.env,
  231. machine=""
  232. )
  233. else:
  234. insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{out_user_name}", "{out_user_id}", "{self.platform}", 1 )"""
  235. MysqlHelper.update_values(
  236. log_type=self.mode,
  237. crawler=self.platform,
  238. sql=insert_sql,
  239. env=self.env,
  240. machine="",
  241. )
  242. def get_user_list(self):
  243. select_user_sql = f"""select name_id from accounts where platform = "{self.platform}" and useful = 1"""
  244. out_user_info = MysqlHelper.get_values(
  245. log_type=self.mode,
  246. crawler=self.platform,
  247. sql=select_user_sql,
  248. env=self.env,
  249. machine="",
  250. )
  251. if out_user_info:
  252. result = []
  253. for i in out_user_info:
  254. result.append(i[0])
  255. return result
  256. else:
  257. return []
  258. def get_detail_video_list(self):
  259. url = "https://api.lidongze.cn/jeecg-boot/ugc/getDetailVideoListsEn2"
  260. headers = {
  261. 'Host': 'api.lidongze.cn',
  262. 'xweb_xhr': '1',
  263. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.4(0x13080410)XWEB/31009',
  264. 'token': '',
  265. 'content-type': 'application/json',
  266. 'accept': '*/*',
  267. 'referer': 'https://servicewechat.com/wxbf44c7b26b0b3280/4/page-frame.html',
  268. 'accept-language': 'en-US,en;q=0.9'
  269. }
  270. page_index = 1
  271. total_page = 2
  272. while page_index <= total_page:
  273. try:
  274. if self.limit_flag:
  275. AliyunLogger.logging(
  276. code="2000",
  277. platform=self.platform,
  278. mode=self.mode,
  279. env=self.env,
  280. message="本轮已经抓取足够数量的视频"
  281. )
  282. return
  283. else:
  284. query = {
  285. "groupId": "1650323161797439489",
  286. "pageNo": page_index,
  287. "pageSize": 10,
  288. "appid": "wxbf44c7b26b0b3280",
  289. "type": 2,
  290. "hxid": "1556555457243828666"
  291. }
  292. params = {
  293. "v": self.cryptor.aes_encrypt(data=json.dumps(query))
  294. }
  295. response = requests.request("GET", url, headers=headers, params=params)
  296. result = json.loads(self.cryptor.aes_decrypt(response.text))
  297. total_page = result['list']['pages']
  298. page_index = result['list']['current'] + 1
  299. for index, video_obj in enumerate(result['list']['records'], 1):
  300. try:
  301. AliyunLogger.logging(
  302. code="1001",
  303. platform=self.platform,
  304. mode=self.mode,
  305. env=self.env,
  306. message="扫描到一条视频",
  307. data=video_obj
  308. )
  309. self.process_video_obj(video_obj)
  310. except Exception as e:
  311. AliyunLogger.logging(
  312. code="3000",
  313. platform=self.platform,
  314. mode=self.mode,
  315. env=self.env,
  316. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(page_index, index,
  317. e)
  318. )
  319. except Exception as e:
  320. AliyunLogger.logging(
  321. code="3000",
  322. platform=self.platform,
  323. mode=self.mode,
  324. env=self.env,
  325. message="抓取第{}页的时候失败, 报错原因是{}".format(page_index, e)
  326. )
  327. time.sleep(random.randint(5, 10))
  328. def schedule(self):
  329. """
  330. 先抓取推荐列表的视频, 等待 2 分钟后抓取 detail 页面,等待 5 分钟后,抓取账号视频
  331. """
  332. self.get_recommend_list()
  333. if self.limit_flag:
  334. return
  335. # time.sleep(2 * 60)
  336. # self.get_detail_video_list()
  337. # if self.limit_flag:
  338. # return
  339. # time.sleep(5 * 60)
  340. # self.mode = "author"
  341. # user_list = self.get_user_list()
  342. # if user_list:
  343. # for index, user_id in enumerate(user_list):
  344. # try:
  345. # if self.limit_flag:
  346. # AliyunLogger.logging(
  347. # code="2000",
  348. # platform=self.platform,
  349. # mode=self.mode,
  350. # env=self.env,
  351. # message="本轮已经抓取足够数量的视频"
  352. # )
  353. # return
  354. # self.get_user_videos(user_id=user_id)
  355. # except Exception as e:
  356. # AliyunLogger.logging(
  357. # code="3000",
  358. # platform=self.platform,
  359. # mode=self.mode,
  360. # env=self.env,
  361. # message="抓取账号视频出现异常,账号 id 是{}, 报错原因是{}".format(user_id, e)
  362. # )
  363. if __name__ == '__main__':
  364. pass