xiaoniangao_plus.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. # -*- coding: utf-8 -*-
  2. # @Author: luojunhui
  3. # @Time: 2023/12/18
  4. import json
  5. import os
  6. import sys
  7. import time
  8. import uuid
  9. from hashlib import md5
  10. from appium import webdriver
  11. from appium.webdriver.extensions.android.nativekey import AndroidKey
  12. from bs4 import BeautifulSoup
  13. from selenium.common.exceptions import NoSuchElementException
  14. from selenium.webdriver.common.by import By
  15. sys.path.append(os.getcwd())
  16. from application.functions import get_redirect_url
  17. from application.pipeline import PiaoQuanPipelineTest
  18. from application.common.messageQueue import MQ
  19. from application.common.log import AliyunLogger
  20. class XiaoNianGaoPlusRecommend(object):
  21. def __init__(self, log_type, crawler, env, rule_dict, our_uid):
  22. self.mq = None
  23. self.platform = "xiaoniangaoplus"
  24. self.download_cnt = 0
  25. self.element_list = [ ]
  26. self.count = 0
  27. self.swipe_count = 0
  28. self.log_type = log_type
  29. self.crawler = crawler
  30. self.env = env
  31. self.rule_dict = rule_dict
  32. self.our_uid = our_uid
  33. chromedriverExecutable = "/usr/bin/chromedriver"
  34. print("启动微信")
  35. # 微信的配置文件
  36. caps = {
  37. "platformName": "Android",
  38. "devicesName": "Android",
  39. "appPackage": "com.tencent.mm",
  40. "appActivity": ".ui.LauncherUI",
  41. "autoGrantPermissions": "true",
  42. "noReset": True,
  43. "resetkeyboard": True,
  44. "unicodekeyboard": True,
  45. "showChromedriverLog": True,
  46. "printPageSourceOnFailure": True,
  47. "recreateChromeDriverSessions": True,
  48. "enableWebviewDetailsCollection": True,
  49. "setWebContentsDebuggingEnabled": True,
  50. "newCommandTimeout": 6000,
  51. "automationName": "UiAutomator2",
  52. "chromedriverExecutable": chromedriverExecutable,
  53. "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
  54. }
  55. try:
  56. self.driver = webdriver.Remote("http://localhost:4750/wd/hub", caps)
  57. except Exception as e:
  58. print(e)
  59. return
  60. self.driver.implicitly_wait(30)
  61. for i in range(10):
  62. try:
  63. if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
  64. print("启动微信成功")
  65. break
  66. elif self.driver.find_element(
  67. By.ID, "com.android.systemui:id/dismiss_view"
  68. ):
  69. print("发现并关闭系统下拉菜单")
  70. size = self.driver.get_window_size()
  71. self.driver.swipe(
  72. int(size["width"] * 0.5),
  73. int(size["height"] * 0.8),
  74. int(size["width"] * 0.5),
  75. int(size["height"] * 0.2),
  76. 200,
  77. )
  78. else:
  79. pass
  80. except Exception as e:
  81. print(f"打开微信异常:{e}")
  82. time.sleep(1)
  83. size = self.driver.get_window_size()
  84. self.driver.swipe(
  85. int(size["width"] * 0.5),
  86. int(size["height"] * 0.2),
  87. int(size["width"] * 0.5),
  88. int(size["height"] * 0.8),
  89. 200,
  90. )
  91. time.sleep(1)
  92. self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
  93. print("打开小程序小年糕+成功")
  94. time.sleep(5)
  95. self.get_videoList()
  96. time.sleep(1)
  97. self.driver.quit()
  98. def search_elements(self, xpath):
  99. time.sleep(1)
  100. windowHandles = self.driver.window_handles
  101. for handle in windowHandles:
  102. self.driver.switch_to.window(handle)
  103. time.sleep(1)
  104. try:
  105. elements = self.driver.find_elements(By.XPATH, xpath)
  106. if elements:
  107. return elements
  108. except NoSuchElementException:
  109. pass
  110. def check_to_applet(self, xpath):
  111. time.sleep(1)
  112. webViews = self.driver.contexts
  113. self.driver.switch_to.context(webViews[-1])
  114. windowHandles = self.driver.window_handles
  115. for handle in windowHandles:
  116. self.driver.switch_to.window(handle)
  117. time.sleep(1)
  118. try:
  119. self.driver.find_element(By.XPATH, xpath)
  120. print("切换到WebView成功\n")
  121. return
  122. except NoSuchElementException:
  123. time.sleep(1)
  124. def swipe_up(self):
  125. self.search_elements('//*[@class="list-list--list"]')
  126. size = self.driver.get_window_size()
  127. self.driver.swipe(
  128. int(size["width"] * 0.5),
  129. int(size["height"] * 0.8),
  130. int(size["width"] * 0.5),
  131. int(size["height"] * 0.442),
  132. 200,
  133. )
  134. self.swipe_count += 1
  135. def get_video_url(self, video_title_element):
  136. for i in range(3):
  137. self.search_elements('//*[@class="list-list--list"]')
  138. time.sleep(1)
  139. self.driver.execute_script(
  140. "arguments[0].scrollIntoView({block:'center',inline:'center'});",
  141. video_title_element[0],
  142. )
  143. time.sleep(3)
  144. video_title_element[0].click()
  145. self.check_to_applet(
  146. xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
  147. )
  148. time.sleep(10)
  149. video_url_elements = self.search_elements(
  150. '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
  151. )
  152. return video_url_elements[0].get_attribute("src")
  153. def parse_detail(self, index):
  154. page_source = self.driver.page_source
  155. soup = BeautifulSoup(page_source, "html.parser")
  156. soup.prettify()
  157. video_list = soup.findAll(
  158. name="wx-view", attrs={"class": "expose--adapt-parent"}
  159. )
  160. index = index + 1
  161. element_list = [i for i in video_list][index:]
  162. return element_list[0]
  163. def get_video_info_2(self, video_element):
  164. if self.download_cnt >= int(
  165. self.rule_dict.get("videos_cnt", {}).get("min", 10)
  166. ):
  167. self.count = 0
  168. self.download_cnt = 0
  169. self.element_list = []
  170. return
  171. self.count += 1
  172. # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引
  173. trace_id = self.crawler + str(uuid.uuid1())
  174. print("扫描到一条视频")
  175. # 标题
  176. video_title = video_element.find("wx-view", class_="dynamic--title").text
  177. # 播放量字符串
  178. play_str = video_element.find("wx-view", class_="dynamic--views").text
  179. info_list = video_element.findAll(
  180. "wx-view", class_="dynamic--commerce-btn-text"
  181. )
  182. # 点赞数量
  183. like_str = info_list[1].text
  184. # 评论数量
  185. comment_str = info_list[2].text
  186. # 视频时长
  187. duration_str = video_element.find("wx-view", class_="dynamic--duration").text
  188. user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
  189. # 头像 URL
  190. avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
  191. # 封面 URL
  192. cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"]
  193. play_cnt = int(play_str.replace("+", "").replace("次播放", ""))
  194. duration = int(duration_str.split(":")[0].strip()) * 60 + int(
  195. duration_str.split(":")[-1].strip()
  196. )
  197. if "点赞" in like_str:
  198. like_cnt = 0
  199. elif "万" in like_str:
  200. like_cnt = int(like_str.split("万")[0]) * 10000
  201. else:
  202. like_cnt = int(like_str)
  203. if "评论" in comment_str:
  204. comment_cnt = 0
  205. elif "万" in comment_str:
  206. comment_cnt = int(comment_str.split("万")[0]) * 10000
  207. else:
  208. comment_cnt = int(comment_str)
  209. out_video_id = md5(video_title.encode("utf8")).hexdigest()
  210. out_user_id = md5(user_name.encode("utf8")).hexdigest()
  211. video_dict = {
  212. "video_title": video_title,
  213. "video_id": out_video_id,
  214. "out_video_id": out_video_id,
  215. "duration_str": duration_str,
  216. "duration": duration,
  217. "play_str": play_str,
  218. "play_cnt": play_cnt,
  219. "like_str": like_str,
  220. "like_cnt": like_cnt,
  221. "comment_cnt": comment_cnt,
  222. "share_cnt": 0,
  223. "user_name": user_name,
  224. "user_id": out_user_id,
  225. "publish_time_stamp": int(time.time()),
  226. "publish_time_str": time.strftime(
  227. "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
  228. ),
  229. "update_time_stamp": int(time.time()),
  230. "avatar_url": avatar_url,
  231. "cover_url": cover_url,
  232. "session": f"xiaoniangao-{int(time.time())}",
  233. }
  234. pipeline = PiaoQuanPipelineTest(
  235. platform=self.crawler,
  236. mode=self.log_type,
  237. item=video_dict,
  238. rule_dict=self.rule_dict,
  239. env=self.env,
  240. trace_id=trace_id,
  241. )
  242. flag = pipeline.process_item()
  243. if flag:
  244. video_title_element = self.search_elements(
  245. f'//*[contains(text(), "{video_title}")]'
  246. )
  247. if video_title_element is None:
  248. return
  249. print("点击标题,进入视频详情页")
  250. video_url = self.get_video_url(video_title_element)
  251. print(video_url)
  252. video_url = get_redirect_url(video_url)
  253. print(video_url)
  254. if video_url is None:
  255. self.driver.press_keycode(AndroidKey.BACK)
  256. time.sleep(5)
  257. return
  258. video_dict["video_url"] = video_url
  259. video_dict["platform"] = self.crawler
  260. video_dict["strategy"] = self.log_type
  261. video_dict["out_video_id"] = video_dict["video_id"]
  262. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  263. video_dict["user_id"] = self.our_uid
  264. video_dict["publish_time"] = video_dict["publish_time_str"]
  265. print(json.dumps(video_dict, ensure_ascii=False, indent=4))
  266. self.download_cnt += 1
  267. self.driver.press_keycode(AndroidKey.BACK)
  268. time.sleep(5)
  269. def get_video_info(self, video_element):
  270. try:
  271. self.get_video_info_2(video_element)
  272. except Exception as e:
  273. self.driver.press_keycode(AndroidKey.BACK)
  274. print(f"抓取单条视频异常:{e}\n")
  275. def get_videoList(self):
  276. self.driver.implicitly_wait(20)
  277. # 切换到 web_view
  278. self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]')
  279. print("切换到 webview 成功")
  280. time.sleep(1)
  281. page = 0
  282. if self.search_elements('//*[@class="list-list--list"]') is None:
  283. print("窗口已销毁")
  284. self.count = 0
  285. self.download_cnt = 0
  286. self.element_list = []
  287. return
  288. print("开始获取视频信息")
  289. for i in range(50):
  290. print("下滑{}次".format(i))
  291. element = self.parse_detail(i)
  292. self.get_video_info(element)
  293. self.swipe_up()
  294. time.sleep(1)
  295. if self.swipe_count > 100:
  296. return
  297. print("已抓取完一组,休眠 5 秒\n")
  298. time.sleep(5)