shipinshuashua_id.py 9.6 KB


  1. # -*- coding: utf-8 -*-
  2. # @Time: 2024/01/04
  3. import re
  4. import json
  5. import os
  6. import sys
  7. import time
  8. import requests
  9. sys.path.append(os.getcwd())
  10. from datetime import date, timedelta
  11. from application.functions import clean_title
  12. from appium import webdriver
  13. from appium.webdriver.extensions.android.nativekey import AndroidKey
  14. from bs4 import BeautifulSoup
  15. from selenium.common.exceptions import NoSuchElementException
  16. from selenium.webdriver.common.by import By
  17. from application.common.mysql import MysqlHelper
  18. class SPSSIdRecommend:
  19. def __init__(self, env):
  20. self.env = env
  21. chromedriverExecutable = "/Users/tzld/Downloads/chromedriver_v111/chromedriver"
  22. # 微信的配置文件
  23. caps = {
  24. "platformName": "Android",
  25. "devicesName": "Android",
  26. "appPackage": "com.tencent.mm",
  27. "appActivity": ".ui.LauncherUI",
  28. "autoGrantPermissions": "true",
  29. "noReset": True,
  30. "resetkeyboard": True,
  31. "unicodekeyboard": True,
  32. "showChromedriverLog": True,
  33. "printPageSourceOnFailure": True,
  34. "recreateChromeDriverSessions": True,
  35. "enableWebviewDetailsCollection": True,
  36. "setWebContentsDebuggingEnabled": True,
  37. "newCommandTimeout": 6000,
  38. "automationName": "UiAutomator2",
  39. "chromedriverExecutable": chromedriverExecutable,
  40. "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
  41. }
  42. try:
  43. self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
  44. except Exception as e:
  45. print(e)
  46. return
  47. self.driver.implicitly_wait(30)
  48. for i in range(120):
  49. try:
  50. if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
  51. break
  52. elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):
  53. size = self.driver.get_window_size()
  54. self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.8),
  55. int(size['width'] * 0.5), int(size['height'] * 0.2), 200)
  56. else:
  57. pass
  58. except NoSuchElementException:
  59. time.sleep(1)
  60. size = self.driver.get_window_size()
  61. self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),
  62. int(size['width'] * 0.5), int(size['height'] * 0.8), 200)
  63. time.sleep(1)
  64. self.driver.find_elements(By.XPATH, '//*[@text="视频刷刷"]')[-1].click()
  65. time.sleep(5)
  66. print("打开小程序")
  67. self.get_videoList()
  68. time.sleep(1)
  69. self.driver.quit()
  70. def search_elements(self, xpath):
  71. time.sleep(1)
  72. windowHandles = self.driver.window_handles
  73. for handle in windowHandles:
  74. self.driver.switch_to.window(handle)
  75. time.sleep(1)
  76. try:
  77. elements = self.driver.find_elements(By.XPATH, xpath)
  78. if elements:
  79. return elements
  80. except NoSuchElementException:
  81. pass
  82. def check_to_applet(self, xpath):
  83. time.sleep(1)
  84. webViews = self.driver.contexts
  85. self.driver.switch_to.context(webViews[-1])
  86. windowHandles = self.driver.window_handles
  87. for handle in windowHandles:
  88. self.driver.switch_to.window(handle)
  89. time.sleep(1)
  90. try:
  91. self.driver.find_element(By.XPATH, xpath)
  92. return
  93. except NoSuchElementException:
  94. time.sleep(1)
  95. def swipe_up(self):
  96. self.search_elements('//*[@class="dynamic--nick-top"]')
  97. size = self.driver.get_window_size()
  98. self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8),
  99. int(size["width"] * 0.5), int(size["height"] * 0.442), 200)
  100. def get_video_id(self, video_name_element):
  101. for i in range(3):
  102. self.search_elements('//*[@class="dynamic--nick-top"]')
  103. time.sleep(1)
  104. self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});",
  105. video_name_element[0])
  106. time.sleep(3)
  107. video_name_element[0].click()
  108. self.check_to_applet(xpath=r'//wx-view[@class="small--mid"]')
  109. time.sleep(10)
  110. video_id_elements = self.search_elements(
  111. '//wx-view[@class="small--mid"]')
  112. if video_id_elements:
  113. return video_id_elements[0].text
  114. def parse_detail(self, index):
  115. page_source = self.driver.page_source
  116. soup = BeautifulSoup(page_source, 'html.parser')
  117. soup.prettify()
  118. video_list = soup.findAll(name="wx-view", attrs={"class": "expose--adapt-parent"})
  119. element_list = [i for i in video_list][index:]
  120. return element_list[0]
  121. def get_video_info_2(self, video_element):
  122. # 用户名
  123. username = video_element.find("wx-view", class_="dynamic--nick-top").text
  124. username_element = self.search_elements(f'//*[contains(text(), "{username}")]')
  125. if username_element is None:
  126. return
  127. video_id = self.get_video_id(username_element)
  128. if video_id is None:
  129. self.driver.press_keycode(AndroidKey.BACK)
  130. time.sleep(5)
  131. return
  132. video_id = re.sub(r'\D', '', video_id)
  133. video_id = int(video_id)
  134. repeat_video_id = self.repeat_video_id(video_id)
  135. if repeat_video_id != 0:
  136. self.driver.press_keycode(AndroidKey.BACK)
  137. return
  138. data_list = self.get_user_list(video_id)
  139. if len(data_list) == 0:
  140. self.driver.press_keycode(AndroidKey.BACK)
  141. return
  142. else:
  143. status = 1
  144. localtime = time.localtime(time.time())
  145. formatted_time = time.strftime("%Y-%m-%d", localtime)
  146. print(formatted_time)
  147. self.insert_user(video_id, username, data_list, status, formatted_time)
  148. self.driver.press_keycode(AndroidKey.BACK)
  149. time.sleep(2)
  150. def insert_user(self, mid, user_name, data_list, status, formatted_time):
  151. insert_sql = f"""insert into crawler_xng_userid( user_id , user_name , user_title_text , status, time) values ({mid},"{user_name}", "{data_list}",{status}, "{formatted_time}")"""
  152. print(insert_sql)
  153. MysqlHelper(self.env).update(insert_sql)
  154. def repeat_video_id(self, mid):
  155. sql = f"SELECT `link` FROM `crawler_user_v3` WHERE `source` = 'xiaoniangao' and `link` = {mid}"
  156. repeat_video_id = MysqlHelper(self.env).select(sql)
  157. return len(repeat_video_id)
  158. def get_user_list(self, mid):
  159. next_t = -1
  160. url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
  161. headers = {
  162. 'Host': 'kapi-xng-app.xiaoniangao.cn',
  163. 'content-type': 'application/json; charset=utf-8',
  164. 'accept': '*/*',
  165. 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=',
  166. 'verb': 'POST',
  167. 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083',
  168. 'x-b3-traceid': '2f9da41f960ae077',
  169. 'accept-language': 'zh-cn',
  170. 'date': 'Mon, 19 Jun 2023 06:41:17 GMT',
  171. 'x-token-id': '',
  172. 'x-signaturemethod': 'hmac-sha1',
  173. 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0'
  174. }
  175. payload = {
  176. "token": "",
  177. "limit": 20,
  178. "start_t": next_t,
  179. "visited_mid": mid,
  180. "share_width": 300,
  181. "share_height": 240,
  182. }
  183. response = requests.request(
  184. "POST",
  185. url,
  186. headers=headers,
  187. data=json.dumps(payload),
  188. )
  189. data_list = []
  190. if "data" not in response.text or response.status_code != 200:
  191. return data_list
  192. elif "list" not in response.json()["data"]:
  193. return data_list
  194. elif len(response.json()["data"]["list"]) == 0:
  195. return data_list
  196. list = response.json()["data"]["list"]
  197. for video_obj in list:
  198. video_title = clean_title(video_obj.get("title", ""))
  199. # 发布时间
  200. publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
  201. publish_time_str = time.strftime(
  202. "%Y-%m-%d", time.localtime(publish_time_stamp)
  203. )
  204. date_three_days_ago_string = (date.today() + timedelta(days=-7)).strftime("%Y-%m-%d")
  205. rule = publish_time_str >= date_three_days_ago_string
  206. if rule == False:
  207. return ""
  208. v_url = video_obj.get("v_url")
  209. data_list.append(video_title + ":" + v_url)
  210. return data_list
  211. def get_video_info(self, video_element):
  212. try:
  213. self.get_video_info_2(video_element)
  214. except Exception as e:
  215. print(e)
  216. self.driver.press_keycode(AndroidKey.BACK)
  217. def get_videoList(self):
  218. self.driver.implicitly_wait(20)
  219. # 切换到 web_view
  220. self.check_to_applet(xpath='//*[@class="dynamic--nick-top"]')
  221. print("切换到 webview 成功")
  222. time.sleep(1)
  223. if self.search_elements('//*[@class="dynamic--nick-top"]') is None:
  224. return
  225. print("开始获取视频信息")
  226. for i in range(50):
  227. print("下滑{}次".format(i))
  228. element = self.parse_detail(i)
  229. self.get_video_info(element)
  230. self.swipe_up()
  231. time.sleep(1)
  232. print("下滑完成")
  233. time.sleep(5)
  234. if __name__ == '__main__':
  235. SPSSIdRecommend(
  236. env="prod"
  237. )