xiaoniangao_zhanghao.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import os
  4. import random
  5. import subprocess
  6. import sys
  7. import time
  8. import uuid
  9. from datetime import datetime
  10. import requests
  11. from appium import webdriver
  12. from appium.webdriver.extensions.android.nativekey import AndroidKey
  13. from appium.webdriver.common.touch_action import TouchAction
  14. from bs4 import BeautifulSoup
  15. from selenium.common.exceptions import NoSuchElementException
  16. from selenium.webdriver.common.by import By
  17. from application.common import MysqlHelper, Feishu
  18. sys.path.append(os.getcwd())
  19. class XiaoNianGaoZH(object):
  20. """
  21. 小年糕+线下爬虫
  22. """
  23. def __init__(self):
  24. mid = 1160417293
  25. current_time = datetime.now()
  26. formatted_time = current_time.strftime("%Y%m%d")
  27. date_int = int(formatted_time)
  28. # 获取时间标签
  29. tag_id = self.get_tag_id(date_int)
  30. print(tag_id)
  31. # 新增账号
  32. pq_uid = self.insert_number(mid, tag_id)
  33. self.count = 0
  34. self.swipe_count = 0
  35. chromedriverExecutable = "/Users/tzld/Downloads/chromedriver-mac-x64/chromedriver"
  36. print("启动微信")
  37. # 微信的配置文件
  38. caps = {
  39. "platformName": "Android",
  40. "devicesName": "Android",
  41. "appPackage": "com.tencent.mm",
  42. "appActivity": ".ui.LauncherUI",
  43. "autoGrantPermissions": True,
  44. "noReset": True,
  45. "resetkeyboard": True,
  46. "unicodekeyboard": True,
  47. "showChromedriverLog": True,
  48. "printPageSourceOnFailure": True,
  49. "recreateChromeDriverSessions": True,
  50. "enableWebviewDetailsCollection": True,
  51. "setWebContentsDebuggingEnabled": True,
  52. "newCommandTimeout": 6000,
  53. "automationName": "UiAutomator2",
  54. "chromedriverExecutable": chromedriverExecutable,
  55. "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
  56. }
  57. try:
  58. self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps)
  59. except Exception as e:
  60. print(e)
  61. return
  62. self.driver.implicitly_wait(30)
  63. for i in range(10):
  64. try:
  65. if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):
  66. print("启动微信成功")
  67. break
  68. elif self.driver.find_element(
  69. By.ID, "com.android.systemui:id/dismiss_view"
  70. ):
  71. print("发现并关闭系统下拉菜单")
  72. size = self.driver.get_window_size()
  73. self.driver.swipe(
  74. int(size["width"] * 0.5),
  75. int(size["height"] * 0.8),
  76. int(size["width"] * 0.5),
  77. int(size["height"] * 0.2),
  78. 200,
  79. )
  80. else:
  81. pass
  82. except Exception as e:
  83. print(f"打开微信异常:{e}")
  84. time.sleep(1)
  85. size = self.driver.get_window_size()
  86. self.driver.swipe(
  87. int(size["width"] * 0.5),
  88. int(size["height"] * 0.2),
  89. int(size["width"] * 0.5),
  90. int(size["height"] * 0.8),
  91. 200,
  92. )
  93. time.sleep(1)
  94. command = 'adb shell service call statusbar 2'
  95. process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
  96. process.communicate()
  97. self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click()
  98. print("打开小程序小年糕+成功")
  99. time.sleep(5)
  100. self.get_videoList()
  101. time.sleep(1)
  102. self.driver.quit()
  103. def search_elements(self, xpath):
  104. time.sleep(1)
  105. windowHandles = self.driver.window_handles
  106. for handle in windowHandles:
  107. self.driver.switch_to.window(handle)
  108. time.sleep(1)
  109. try:
  110. elements = self.driver.find_elements(By.XPATH, xpath)
  111. if elements:
  112. return elements
  113. except NoSuchElementException:
  114. pass
  115. def check_to_applet(self, xpath):
  116. time.sleep(1)
  117. webViews = self.driver.contexts
  118. self.driver.switch_to.context(webViews[-1])
  119. windowHandles = self.driver.window_handles
  120. for handle in windowHandles:
  121. self.driver.switch_to.window(handle)
  122. time.sleep(1)
  123. try:
  124. self.driver.find_element(By.XPATH, xpath)
  125. print("切换到WebView成功\n")
  126. return
  127. except NoSuchElementException:
  128. time.sleep(1)
  129. def swipe_up(self):
  130. self.search_elements('//*[@class="list-list--list"]')
  131. size = self.driver.get_window_size()
  132. action = TouchAction(self.driver)
  133. action.press(x=int(size["width"] * 0.5), y=int(size["height"] * 0.85))
  134. action.wait(ms=1300) # 可以调整等待时间
  135. action.move_to(x=int(size["width"] * 0.5), y=int(size["height"] * 0.2))
  136. action.release()
  137. action.perform()
  138. self.swipe_count += 1
  139. def get_video_url(self, video_title_element):
  140. for i in range(3):
  141. self.search_elements('//*[@class="list-list--list"]')
  142. time.sleep(1)
  143. self.driver.execute_script(
  144. "arguments[0].scrollIntoView({block:'center',inline:'center'});",
  145. video_title_element[0],
  146. )
  147. time.sleep(3)
  148. video_title_element[0].click()
  149. self.check_to_applet(
  150. xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
  151. )
  152. time.sleep(10)
  153. video_url_elements = self.search_elements(
  154. '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]'
  155. )
  156. return video_url_elements[0].get_attribute("src")
  157. def parse_detail(self, index):
  158. page_source = self.driver.page_source
  159. soup = BeautifulSoup(page_source, "html.parser")
  160. soup.prettify()
  161. video_list = soup.findAll(
  162. name="wx-view", attrs={"class": "expose--adapt-parent"}
  163. )
  164. index = index + 1
  165. element_list = [i for i in video_list][index:]
  166. return element_list[0]
  167. def get_video_info_2(self, video_element):
  168. self.count += 1
  169. video_title = video_element.find("wx-view", class_="dynamic--title").text
  170. # 头像 URL
  171. avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]
  172. # 用户名称
  173. user_name = video_element.find("wx-view", class_="dynamic--nick-top").text
  174. name_url = self.select_name_url(avatar_url, user_name)
  175. if name_url:
  176. video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]')
  177. if video_title_element is None:
  178. return
  179. self.get_video_url(video_title_element)
  180. video_mid_elements = self.search_elements("//wx-view[@class='bar--navBar-content-capsule-wrap']")
  181. mid = int(video_mid_elements[0].get_attribute("data-mid"))
  182. self.driver.press_keycode(AndroidKey.BACK)
  183. time.sleep(5)
  184. uid = self.select_id(mid)
  185. if uid:
  186. self.update_name_url(mid, avatar_url, user_name)
  187. else:
  188. time.sleep(1)
  189. link = self.select_id_status(mid)
  190. if link:
  191. current_time = datetime.now()
  192. formatted_time = current_time.strftime("%Y%m%d")
  193. date_int = int(formatted_time)
  194. # 获取时间标签
  195. tag_id = self.get_tag_id(date_int)
  196. time.sleep(5)
  197. print(tag_id)
  198. # 新增账号
  199. pq_uid = self.insert_number(mid, tag_id)
  200. time.sleep(5)
  201. if pq_uid:
  202. self.insert_name_url(mid, avatar_url, user_name)
  203. # 获取当前时间
  204. current_time = datetime.now()
  205. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  206. values = [[
  207. str(mid),
  208. user_name,
  209. avatar_url,
  210. str(pq_uid),
  211. formatted_time,
  212. ]]
  213. Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "8zlceR", "ROWS", 1, 2)
  214. time.sleep(0.5)
  215. Feishu.update_values('xiaoniangao', 'xiaoniangao', "8zlceR", "A2:Z2", values)
  216. print("写入飞书表格成功")
  217. def get_video_info(self, video_element):
  218. try:
  219. self.get_video_info_2(video_element)
  220. except Exception as e:
  221. print(f"抓取单条视频异常:{e}\n")
  222. def get_videoList(self):
  223. """
  224. 获取视频列表
  225. :return:
  226. """
  227. # while True:
  228. self.driver.implicitly_wait(20)
  229. # 切换到 web_view
  230. self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]')
  231. print("切换到 webview 成功")
  232. time.sleep(1)
  233. if self.search_elements('//*[@class="list-list--list"]') is None:
  234. print("窗口已销毁")
  235. self.count = 0
  236. self.download_cnt = 0
  237. self.element_list = []
  238. return
  239. print("开始获取视频信息")
  240. for i in range(50):
  241. print("下滑{}次".format(i))
  242. element = self.parse_detail(i)
  243. self.get_video_info(element)
  244. self.swipe_up()
  245. time.sleep(random.randint(1, 5))
  246. def insert_number(self, mid, tag_id):
  247. for i in range(3):
  248. url = "https://admin.piaoquantv.com/manager/crawler/v3/user/save"
  249. payload = {
  250. "source": "xiaoniangao",
  251. "mode": "author",
  252. "modeValue": "",
  253. "modeBoard": "",
  254. "recomStatus": -6,
  255. "appRecomStatus": -6,
  256. "autoAuditStatus": 0,
  257. "tag": f"459,454,106,8240,{int(tag_id)}",
  258. "contentCategory": 0,
  259. "link": str(mid)
  260. }
  261. headers = {
  262. 'content-length': '0',
  263. 'cookie': 'SESSION=MWM4YzVlMTctNzdkNC00NjE3LWIxZTctOGQwYzgzYmVmN2Qw',
  264. 'origin': 'https://admin.piaoquantv.com',
  265. 'priority': 'u=1, i',
  266. 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
  267. 'sec-ch-ua-mobile': '?0',
  268. 'sec-ch-ua-platform': '"macOS"'
  269. }
  270. response = requests.request("POST", url, headers=headers, json=payload)
  271. response = response.json()
  272. code = response["code"]
  273. if code == 0:
  274. print("添加账号成功")
  275. time.sleep(1)
  276. url = "https://admin.piaoquantv.com/manager/crawler/v3/user/list"
  277. payload = {
  278. "pageNum": 1,
  279. "pageSize": 20
  280. }
  281. response = requests.request("POST", url, headers=headers, json=payload)
  282. response = response.json()
  283. list = response["content"]['list']
  284. link = list[0]["link"]
  285. if link == str(mid):
  286. print("获取站内账号ID成功")
  287. return list[0]["uid"]
  288. """
  289. 查询用户名+头像是否存在
  290. """
  291. def select_name_url(self, avatar_url, user_name):
  292. sql = f""" select uid from xng_uid where avatar_url = "{avatar_url}" and user_name="{user_name}"; """
  293. db = MysqlHelper()
  294. repeat_video = db.select(sql=sql)
  295. if repeat_video:
  296. return False
  297. return True
  298. def get_tag_id(self, date_int):
  299. for i in range(3):
  300. url = f"https://admin.piaoquantv.com/manager/user/up/searchUserTypeTag?keyword={date_int}&muid=7"
  301. payload = {}
  302. headers = {
  303. 'content-length': '0',
  304. 'cookie': 'SESSION=MWM4YzVlMTctNzdkNC00NjE3LWIxZTctOGQwYzgzYmVmN2Qw',
  305. 'origin': 'https://admin.piaoquantv.com',
  306. 'priority': 'u=1, i',
  307. 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
  308. 'sec-ch-ua-mobile': '?0',
  309. 'sec-ch-ua-platform': '"macOS"'
  310. }
  311. response = requests.request("POST", url, headers=headers, data=payload)
  312. response = response.json()
  313. content = response["content"]
  314. if content:
  315. tagId = content[0]['tagId']
  316. return tagId
  317. else:
  318. url = f"https://admin.piaoquantv.com/manager/user/up/createUserTypeTag?tagName={date_int}&muid=7"
  319. response = requests.request("POST", url, headers=headers, data=payload)
  320. response = response.json()
  321. content = response["content"]
  322. if content:
  323. tagId = content['tagId']
  324. return tagId
  325. """
  326. 修改用户名+头像
  327. """
  328. def update_name_url(self, mid, avatar_url, user_name):
  329. sql = f""" update xng_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{mid}"; """
  330. db = MysqlHelper()
  331. repeat_video = db.update(sql=sql)
  332. if repeat_video:
  333. return True
  334. return False
  335. """
  336. 插入 用户名 头像 用户id
  337. """
  338. def insert_name_url(self, uid, avatar_url, user_name):
  339. current_time = datetime.now()
  340. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  341. insert_sql = f"""INSERT INTO xng_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')"""
  342. db = MysqlHelper()
  343. repeat_video = db.update(sql=insert_sql)
  344. if repeat_video:
  345. return True
  346. return False
  347. """
  348. 查询用户id是否存在
  349. """
  350. def select_id(self, uid):
  351. sql = f""" select uid from xng_uid where uid = "{uid}"; """
  352. db = MysqlHelper()
  353. repeat_video = db.select(sql=sql)
  354. if repeat_video:
  355. return True
  356. return False
  357. """
  358. 查询用户id是否之前已添加过
  359. """
  360. def select_id_status(self, uid):
  361. sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """
  362. db = MysqlHelper()
  363. repeat_video = db.select(sql=sql)
  364. if repeat_video:
  365. return False
  366. return True
  367. if __name__ == "__main__":
  368. XiaoNianGaoZH()