xiaoniangao_plus_scheduling2.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. # -*- coding: utf-8 -*-
  2. # @Author: luojunhui
  3. # @Time: 2023/9/27
  4. import json
  5. import os
  6. import random
  7. import sys
  8. import time
  9. import uuid
  10. from datetime import datetime, timedelta
  11. from hashlib import md5
  12. import requests
  13. from appium import webdriver
  14. from appium.webdriver.extensions.android.nativekey import AndroidKey
  15. from appium.webdriver.webdriver import WebDriver
  16. from bs4 import BeautifulSoup
  17. from selenium.common.exceptions import NoSuchElementException
  18. from selenium.webdriver.common.by import By
  19. import multiprocessing
  20. sys.path.append(os.getcwd())
  21. from common import AliyunLogger, PiaoQuanPipeline
  22. from common.common import Common
  23. from common.mq import MQ
  24. from common.scheduling_db import MysqlHelper
  25. def get_redirect_url(url):
  26. res = requests.get(url, allow_redirects=False)
  27. if res.status_code == 302 or res.status_code == 301:
  28. return res.headers['Location']
  29. else:
  30. return url
  31. class XiaoNianGaoPlusRecommend:
  32. env = None
  33. driver = None
  34. log_type = None
  35. def __init__(self, log_type, crawler, env, rule_dict, our_uid):
  36. self.mq = None
  37. self.platform = "xiaoniangaoplus"
  38. self.download_cnt = 0
  39. self.element_list = []
  40. self.count = 0
  41. self.swipe_count = 0
  42. self.log_type = log_type
  43. self.crawler = crawler
  44. self.env = env
  45. self.rule_dict = rule_dict
  46. self.our_uid = our_uid
  47. if self.env == "dev":
  48. chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver"
  49. else:
  50. chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver"
  51. Common.logger(self.log_type, self.crawler).info("启动微信")
  52. # Common.logging(self.log_type, self.crawler, self.env, '启动微信')
  53. # 微信的配置文件
  54. caps = {
  55. "platformName": "Android",
  56. "devicesName": "Android",
  57. # "platformVersion": "11",
  58. # "udid": "emulator-5554",
  59. "appPackage": "com.tencent.mm",
  60. "appActivity": ".ui.LauncherUI",
  61. "autoGrantPermissions": "true",
  62. "noReset": True,
  63. "resetkeyboard": True,
  64. "unicodekeyboard": True,
  65. "showChromedriverLog": True,
  66. "printPageSourceOnFailure": True,
  67. "recreateChromeDriverSessions": True,
  68. "enableWebviewDetailsCollection": True,
  69. "setWebContentsDebuggingEnabled": True,
  70. "newCommandTimeout": 6000,
  71. "automationName": "UiAutomator2",
  72. "chromedriverExecutable": chromedriverExecutable,
  73. "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
  74. }
  75. try:
  76. self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
  77. except Exception as e:
  78. print(e)
  79. AliyunLogger.logging(
  80. code="3002",
  81. platform=self.platform,
  82. mode=self.log_type,
  83. env=self.env,
  84. message=f'appium 启动异常: {e}'
  85. )
  86. return
  87. self.driver.implicitly_wait(30)
  88. for i in range(10):
  89. try:
  90. if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
  91. Common.logger(self.log_type, self.crawler).info("微信启动成功")
  92. # Common.logging(self.log_type, self.crawler, self.env, '微信启动成功')
  93. AliyunLogger.logging(
  94. code="1000",
  95. platform=self.platform,
  96. mode=self.log_type,
  97. env=self.env,
  98. message="启动微信成功"
  99. )
  100. break
  101. elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):
  102. Common.logger(self.log_type, self.crawler).info("发现并关闭系统下拉菜单")
  103. # Common.logging(self.log_type, self.crawler, self.env, '发现并关闭系统下拉菜单')
  104. AliyunLogger.logging(
  105. code="1000",
  106. platform=self.platform,
  107. mode=self.log_type,
  108. env=self.env,
  109. message="发现并关闭系统下拉菜单"
  110. )
  111. size = self.driver.get_window_size()
  112. self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.8),
  113. int(size['width'] * 0.5), int(size['height'] * 0.2), 200)
  114. # self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click()
  115. else:
  116. pass
  117. except Exception as e:
  118. AliyunLogger.logging(
  119. code="3001",
  120. platform=self.platform,
  121. mode=self.log_type,
  122. env=self.env,
  123. message=f"打开微信异常:{e}"
  124. )
  125. time.sleep(1)
  126. Common.logger(self.log_type, self.crawler).info("下滑,展示小程序选择面板")
  127. size = self.driver.get_window_size()
  128. self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),
  129. int(size['width'] * 0.5), int(size['height'] * 0.8), 200)
  130. time.sleep(1)
  131. Common.logger(self.log_type, self.crawler).info('打开小程序"小年糕+"')
  132. self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
  133. AliyunLogger.logging(
  134. code="1000",
  135. platform=self.platform,
  136. env=self.env,
  137. mode=self.log_type,
  138. message="打开小程序小年糕+成功"
  139. )
  140. time.sleep(5)
  141. self.get_videoList()
  142. time.sleep(1)
  143. self.driver.quit()
  144. def search_elements(self, xpath):
  145. time.sleep(1)
  146. windowHandles = self.driver.window_handles
  147. for handle in windowHandles:
  148. self.driver.switch_to.window(handle)
  149. time.sleep(1)
  150. try:
  151. elements = self.driver.find_elements(By.XPATH, xpath)
  152. if elements:
  153. return elements
  154. except NoSuchElementException:
  155. pass
  156. def check_to_applet(self, xpath):
  157. time.sleep(1)
  158. webViews = self.driver.contexts
  159. self.driver.switch_to.context(webViews[-1])
  160. windowHandles = self.driver.window_handles
  161. for handle in windowHandles:
  162. self.driver.switch_to.window(handle)
  163. time.sleep(1)
  164. try:
  165. self.driver.find_element(By.XPATH, xpath)
  166. Common.logger(self.log_type, self.crawler).info("切换到WebView成功\n")
  167. # Common.logging(self.log_type, self.crawler, self.env, '切换到WebView成功\n')
  168. AliyunLogger.logging(
  169. code="1000",
  170. platform=self.platform,
  171. mode=self.log_type,
  172. env=self.env,
  173. message="成功切换到 webview"
  174. )
  175. return
  176. except NoSuchElementException:
  177. time.sleep(1)
  178. def repeat_video(self, out_video_id):
  179. current_time = datetime.now()
  180. previous_day = current_time - timedelta(days=7)
  181. formatted_time = previous_day.strftime("%Y-%m-%d")
  182. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_video_id}" and create_time <= '{formatted_time}'; """
  183. Common.logger(self.log_type, self.crawler).info(
  184. f"sql{sql}")
  185. repeat_video = MysqlHelper.get_values(
  186. log_type=self.log_type, crawler=self.platform, env=self.env, sql=sql, action=""
  187. )
  188. if repeat_video:
  189. return False
  190. return True
  191. def swipe_up(self):
  192. self.search_elements('//*[@class="list-list--list"]')
  193. size = self.driver.get_window_size()
  194. self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8),
  195. int(size["width"] * 0.5), int(size["height"] * 0.442), 200)
  196. self.swipe_count += 1
  197. def get_video_url(self, video_title_element):
  198. for i in range(3):
  199. self.search_elements('//*[@class="list-list--list"]')
  200. Common.logger(self.log_type, self.crawler).info(f"video_title_element:{video_title_element[0]}")
  201. time.sleep(1)
  202. Common.logger(self.log_type, self.crawler).info("滑动标题至可见状态")
  203. self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});",
  204. video_title_element[0])
  205. time.sleep(3)
  206. Common.logger(self.log_type, self.crawler).info("点击标题")
  207. video_title_element[0].click()
  208. self.check_to_applet(xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]')
  209. Common.logger(self.log_type, self.crawler).info("点击标题完成")
  210. time.sleep(10)
  211. video_url_elements = self.search_elements(
  212. '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]')
  213. Common.logger(self.log_type, self.crawler).info(f"{video_url_elements[0].get_attribute('src')}")
  214. return video_url_elements[0].get_attribute('src')
  215. def parse_detail(self, index):
  216. page_source = self.driver.page_source
  217. soup = BeautifulSoup(page_source, 'html.parser')
  218. soup.prettify()
  219. video_list = soup.findAll(name="wx-view", attrs={"class": "expose--adapt-parent"})
  220. element_list = [i for i in video_list][index:]
  221. return element_list[0]
  222. def get_video_info_2(self, video_element):
  223. Common.logger(self.log_type, self.crawler).info(f"本轮已抓取{self.download_cnt}条视频\n")
  224. # Common.logging(self.log_type, self.crawler, self.env, f"本轮已抓取{self.download_cnt}条视频\n")
  225. if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 10)):
  226. self.count = 0
  227. self.download_cnt = 0
  228. self.element_list = []
  229. return
  230. self.count += 1
  231. Common.logger(self.log_type, self.crawler).info(f"第{self.count}条视频")
  232. # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引
  233. trace_id = self.crawler + str(uuid.uuid1())
  234. AliyunLogger.logging(
  235. code="1001",
  236. platform=self.platform,
  237. mode=self.log_type,
  238. env=self.env,
  239. trace_id=trace_id,
  240. message="扫描到一条视频",
  241. )
  242. # 标题
  243. video_title = video_element.find("wx-view", class_="dynamic--title").text
  244. # 播放量字符串
  245. play_str = video_element.find("wx-view", class_="dynamic--views").text
  246. info_list = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")
  247. # 点赞数量
  248. like_str = info_list[1].text
  249. # 评论数量
  250. comment_str = info_list[2].text
  251. # 视频时长
  252. duration_str = video_element.find("wx-view", class_="dynamic--duration").text
  253. user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
  254. # 头像 URL
  255. avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
  256. # 封面 URL
  257. cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"]
  258. play_cnt = int(play_str.replace("+", "").replace("次播放", ""))
  259. duration = int(duration_str.split(":")[0].strip()) * 60 + int(duration_str.split(":")[-1].strip())
  260. if "点赞" in like_str:
  261. like_cnt = 0
  262. elif "万" in like_str:
  263. like_cnt = int(like_str.split("万")[0]) * 10000
  264. else:
  265. like_cnt = int(like_str)
  266. if "评论" in comment_str:
  267. comment_cnt = 0
  268. elif "万" in comment_str:
  269. comment_cnt = int(comment_str.split("万")[0]) * 10000
  270. else:
  271. comment_cnt = int(comment_str)
  272. out_video_id = md5(video_title.encode('utf8')).hexdigest()
  273. out_user_id = md5(user_name.encode('utf8')).hexdigest()
  274. repeat_id = self.repeat_video(out_video_id)
  275. Common.logger(self.log_type, self.crawler).info(
  276. f"查询{repeat_id}")
  277. if False == repeat_id:
  278. num = time.time()
  279. out_video_id = out_video_id + str(num)
  280. Common.logger(self.log_type, self.crawler).info(
  281. f"新id{out_video_id}")
  282. Common.logger(self.log_type, self.crawler).info(f"数据统计-----标题:{video_title},播放量:{play_cnt},点赞:{like_cnt},评论:{comment_cnt}")
  283. video_dict = {
  284. "video_title": video_title,
  285. "video_id": out_video_id,
  286. 'out_video_id': out_video_id,
  287. "duration_str": duration_str,
  288. "duration": duration,
  289. "play_str": play_str,
  290. "play_cnt": play_cnt,
  291. "like_str": like_str,
  292. "like_cnt": like_cnt,
  293. "comment_cnt": comment_cnt,
  294. "share_cnt": 0,
  295. "user_name": user_name,
  296. "user_id": out_user_id,
  297. 'publish_time_stamp': int(time.time()),
  298. 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
  299. 'update_time_stamp': int(time.time()),
  300. "avatar_url": avatar_url,
  301. "cover_url": cover_url,
  302. "session": f"xiaoniangao-{int(time.time())}"
  303. }
  304. AliyunLogger.logging(
  305. code="1001",
  306. platform=self.platform,
  307. mode=self.log_type,
  308. env=self.env,
  309. trace_id=trace_id,
  310. message="扫描到一条视频",
  311. data=video_dict
  312. )
  313. pipeline = PiaoQuanPipeline(
  314. platform=self.crawler,
  315. mode=self.log_type,
  316. item=video_dict,
  317. rule_dict=self.rule_dict,
  318. env=self.env,
  319. trace_id=trace_id
  320. )
  321. flag = pipeline.process_item()
  322. if flag:
  323. video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]')
  324. if video_title_element is None:
  325. return
  326. Common.logger(self.log_type, self.crawler).info("点击标题,进入视频详情页")
  327. # Common.logging(self.log_type, self.crawler, self.env, "点击标题,进入视频详情页")
  328. AliyunLogger.logging(
  329. code="1000",
  330. platform=self.platform,
  331. mode=self.log_type,
  332. env=self.env,
  333. message="点击标题,进入视频详情页",
  334. )
  335. video_url = self.get_video_url(video_title_element)
  336. video_url = get_redirect_url(video_url)
  337. if video_url is None:
  338. self.driver.press_keycode(AndroidKey.BACK)
  339. time.sleep(5)
  340. return
  341. video_dict['video_url'] = video_url
  342. video_dict["platform"] = self.crawler
  343. video_dict["strategy"] = self.log_type
  344. video_dict["out_video_id"] = video_dict["video_id"]
  345. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  346. video_dict["user_id"] = self.our_uid
  347. video_dict["publish_time"] = video_dict["publish_time_str"]
  348. self.mq.send_msg(video_dict)
  349. AliyunLogger.logging(
  350. code="1002",
  351. platform=self.platform,
  352. mode=self.log_type,
  353. env=self.env,
  354. trace_id=trace_id,
  355. message="发送到ETL成功",
  356. data=video_dict
  357. )
  358. # print(video_dict)
  359. self.download_cnt += 1
  360. self.driver.press_keycode(AndroidKey.BACK)
  361. # self.driver.back()
  362. time.sleep(5)
  363. def get_video_info(self, video_element):
  364. try:
  365. self.get_video_info_2(video_element)
  366. except Exception as e:
  367. self.driver.press_keycode(AndroidKey.BACK)
  368. Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
  369. AliyunLogger.logging(
  370. code="3001",
  371. platform=self.platform,
  372. mode=self.log_type,
  373. env=self.env,
  374. message=f"抓取单条视频异常:{e}\n"
  375. )
  376. def get_videoList(self):
  377. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  378. self.driver.implicitly_wait(20)
  379. # 切换到 web_view
  380. self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]')
  381. print("切换到 webview 成功")
  382. time.sleep(1)
  383. page = 0
  384. if self.search_elements('//*[@class="list-list--list"]') is None:
  385. Common.logger(self.log_type, self.crawler).info("窗口已销毁\n")
  386. # Common.logging(self.log_type, self.crawler, self.env, '窗口已销毁\n')
  387. AliyunLogger.logging(
  388. code="3000",
  389. platform=self.platform,
  390. mode=self.log_type,
  391. env=self.env,
  392. message="窗口已销毁"
  393. )
  394. self.count = 0
  395. self.download_cnt = 0
  396. self.element_list = []
  397. return
  398. print("开始获取视频信息")
  399. for i in range(50):
  400. print("下滑{}次".format(i))
  401. element = self.parse_detail(i)
  402. self.get_video_info(element)
  403. self.swipe_up()
  404. time.sleep(1)
  405. if self.swipe_count > 100:
  406. return
  407. print("下滑完成")
  408. # time.sleep(100)
  409. Common.logger(self.log_type, self.crawler).info("已抓取完一组,休眠 5 秒\n")
  410. # Common.logging(self.log_type, self.crawler, self.env, "已抓取完一组,休眠 5 秒\n")
  411. AliyunLogger.logging(
  412. code="1000",
  413. platform=self.platform,
  414. mode=self.log_type,
  415. env=self.env,
  416. message="已抓取完一组,休眠 5 秒\n",
  417. )
  418. time.sleep(5)
  419. def run():
  420. rule_dict1 = {"period": {"min": 365, "max": 365},
  421. "duration": {"min": 30, "max": 1800},
  422. "favorite_cnt": {"min": 0, "max": 0},
  423. "videos_cnt": {"min": 5000, "max": 0},
  424. "share_cnt": {"min": 0, "max": 0}}
  425. XiaoNianGaoPlusRecommend("recommend", "xiaoniangaoplus", "prod", rule_dict1, [64120158, 64120157, 63676778])
  426. if __name__ == "__main__":
  427. process = multiprocessing.Process(
  428. target=run
  429. )
  430. process.start()
  431. while True:
  432. if not process.is_alive():
  433. print("正在重启")
  434. process.terminate()
  435. time.sleep(60)
  436. os.system("adb forward --remove-all")
  437. process = multiprocessing.Process(target=run)
  438. process.start()
  439. time.sleep(60)