kuaishou_collect.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import json
  2. import os
  3. import random
  4. import time
  5. from appium import webdriver
  6. from appium.webdriver.common.touch_action import TouchAction
  7. from selenium.webdriver.support.ui import WebDriverWait
  8. from selenium.webdriver.support import expected_conditions as EC
  9. from selenium.webdriver.common.by import By
  10. class KuaiShouCollect:
  11. # 初始化appium
  12. def __init__(self, user_id, log_type, crawler, env):
  13. self.desired_caps = {
  14. "platformName": "Android",
  15. "deviceName": "AQQB9X3211W00486",
  16. "appPackage": "com.smile.gifmaker",
  17. "appActivity": "com.yxcorp.gifshow.HomeActivity",
  18. "noReset": True,
  19. "automationName": "UiAutomator2",
  20. }
  21. self.driver = webdriver.Remote(
  22. "http://localhost:4723/wd/hub", self.desired_caps
  23. )
  24. self.driver.implicitly_wait(10)
  25. self.action = TouchAction(self.driver)
  26. if os.path.exists("result.json"):
  27. with open("result.json", "r", encoding="utf-8") as f:
  28. self.name_info_dict = json.loads(f.read())
  29. else:
  30. self.name_info_dict = {}
  31. self.user_id = user_id
  32. self.loge_type = log_type
  33. self.crawler = crawler
  34. self.env = env
  35. def search_by_id(self):
  36. # 搜索 找到搜索按钮并且点击
  37. search_button = WebDriverWait(self.driver, 20).until(
  38. EC.element_to_be_clickable((By.ID, r'com.smile.gifmaker:id/nasa_featured_default_search_view'))
  39. )
  40. print("找到了搜索键")
  41. if search_button:
  42. # action = TouchAction(self.driver)
  43. self.action.tap(search_button).perform()
  44. else:
  45. print("can not find search button")
  46. return
  47. # 找到搜索栏,并且输入keywords
  48. search_bar = WebDriverWait(self.driver, 10).until(
  49. EC.presence_of_element_located((By.ID, r'com.smile.gifmaker:id/editor'))
  50. )
  51. if search_bar:
  52. search_bar.send_keys(self.user_id)
  53. # 点击搜索
  54. self.driver.find_element(By.ID, r"com.smile.gifmaker:id/right_tv").click()
  55. print("搜索完成")
  56. else:
  57. print("fails in input keywords")
  58. return
  59. def get_person_info(self):
  60. """
  61. 点击进入用户主页
  62. 查看是否存在收藏列表,若存在收藏列表,则采集该用户的收藏列表,若不存在,则直接跳过,退出,重新搜索下一个ID
  63. """
  64. # 找到头像,点击进入
  65. avatar = WebDriverWait(self.driver, 20).until(
  66. EC.presence_of_element_located((By.XPATH,
  67. r'//androidx.recyclerview.widget.RecyclerView[@resource-id="com.smile.gifmaker:id/recycler_view"]/android.view.ViewGroup[2]'))
  68. )
  69. # self.driver.find_element.click()
  70. self.action.tap(avatar).perform()
  71. print("进入详情页")
  72. # 找到个人详情下面的数据,看看是否存在收藏
  73. person_tab_list = self.driver.find_elements(
  74. By.ID, r"com.smile.gifmaker:id/tab_text"
  75. )
  76. time.sleep(10)
  77. for tab in person_tab_list:
  78. print(tab.text)
  79. if "收藏" in tab.text:
  80. # print(tab.text)
  81. self.action.tap(tab).perform()
  82. time.sleep(10)
  83. first_video = self.driver.find_element(By.XPATH, r'//android.widget.ImageView[@content-desc="作品"]')
  84. print("找到了第一条视频")
  85. self.action.tap(first_video).perform()
  86. self.get_single_video_info()
  87. print("开始刷视频")
  88. for i in range(50):
  89. try:
  90. print(i)
  91. self.scroll_down()
  92. self.get_single_video_info()
  93. except:
  94. pass
  95. else:
  96. continue
  97. def scroll_down(self):
  98. """
  99. 刷视频函数,使用该函数可以往下滑动进入下一个视频
  100. """
  101. time.sleep(1)
  102. width = self.driver.get_window_size()['width'] # 获取屏幕宽
  103. height = self.driver.get_window_size()['height'] # 获取屏幕高
  104. # print(width, height)
  105. self.action.press(x=int(0.5 * width), y=int(0.75 * height))
  106. self.action.wait(ms=random.randint(200, 400))
  107. self.action.move_to(x=int(0.5 * width), y=int(0.25 * height))
  108. self.action.release()
  109. self.action.perform()
  110. def get_single_video_info(self):
  111. try:
  112. author_name = self.driver.find_element(By.ID, r'com.smile.gifmaker:id/user_name_text_view').text
  113. except:
  114. author_name = ""
  115. try:
  116. title = self.driver.find_element(By.ID, r'com.smile.gifmaker:id/element_caption_label').text
  117. except:
  118. title = ""
  119. if title and author_name:
  120. self.name_info_dict[author_name] = title
  121. def close_spider(self):
  122. self.driver.quit()
  123. with open("result.json", "w", encoding="utf-8") as f:
  124. f.write(json.dumps(self.name_info_dict, ensure_ascii=False, indent=4))
  125. return self.name_info_dict
  126. if __name__ == "__main__":
  127. """
  128. 抓取的时候,如果遇到正在直播的视频,会很慢,这一点需要考虑优化;
  129. 现有的author_爬虫长期未维护,存在问题,一直是失效状态
  130. 2594305039, 2089610315,
  131. """
  132. id_list = [1396121077, 1811823755, "lxy20003246"]
  133. for id in id_list:
  134. ksc = KuaiShouCollect(id)
  135. ksc.search_by_id()
  136. ksc.get_person_info()
  137. ksc.close_spider()