123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import json
- import os
- import random
- import time
- from appium import webdriver
- from appium.webdriver.common.touch_action import TouchAction
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.common.by import By
- class KuaiShouCollect:
- # 初始化appium
- def __init__(self, user_id, log_type, crawler, env):
- self.desired_caps = {
- "platformName": "Android",
- "deviceName": "AQQB9X3211W00486",
- "appPackage": "com.smile.gifmaker",
- "appActivity": "com.yxcorp.gifshow.HomeActivity",
- "noReset": True,
- "automationName": "UiAutomator2",
- }
- self.driver = webdriver.Remote(
- "http://localhost:4723/wd/hub", self.desired_caps
- )
- self.driver.implicitly_wait(10)
- self.action = TouchAction(self.driver)
- if os.path.exists("result.json"):
- with open("result.json", "r", encoding="utf-8") as f:
- self.name_info_dict = json.loads(f.read())
- else:
- self.name_info_dict = {}
- self.user_id = user_id
- self.loge_type = log_type
- self.crawler = crawler
- self.env = env
- def search_by_id(self):
- # 搜索 找到搜索按钮并且点击
- search_button = WebDriverWait(self.driver, 20).until(
- EC.element_to_be_clickable((By.ID, r'com.smile.gifmaker:id/nasa_featured_default_search_view'))
- )
- print("找到了搜索键")
- if search_button:
- # action = TouchAction(self.driver)
- self.action.tap(search_button).perform()
- else:
- print("can not find search button")
- return
- # 找到搜索栏,并且输入keywords
- search_bar = WebDriverWait(self.driver, 10).until(
- EC.presence_of_element_located((By.ID, r'com.smile.gifmaker:id/editor'))
- )
- if search_bar:
- search_bar.send_keys(self.user_id)
- # 点击搜索
- self.driver.find_element(By.ID, r"com.smile.gifmaker:id/right_tv").click()
- print("搜索完成")
- else:
- print("fails in input keywords")
- return
- def get_person_info(self):
- """
- 点击进入用户主页
- 查看是否存在收藏列表,若存在收藏列表,则采集该用户的收藏列表,若不存在,则直接跳过,退出,重新搜索下一个ID
- """
- # 找到头像,点击进入
- avatar = WebDriverWait(self.driver, 20).until(
- EC.presence_of_element_located((By.XPATH,
- r'//androidx.recyclerview.widget.RecyclerView[@resource-id="com.smile.gifmaker:id/recycler_view"]/android.view.ViewGroup[2]'))
- )
- # self.driver.find_element.click()
- self.action.tap(avatar).perform()
- print("进入详情页")
- # 找到个人详情下面的数据,看看是否存在收藏
- person_tab_list = self.driver.find_elements(
- By.ID, r"com.smile.gifmaker:id/tab_text"
- )
- time.sleep(10)
- for tab in person_tab_list:
- print(tab.text)
- if "收藏" in tab.text:
- # print(tab.text)
- self.action.tap(tab).perform()
- time.sleep(10)
- first_video = self.driver.find_element(By.XPATH, r'//android.widget.ImageView[@content-desc="作品"]')
- print("找到了第一条视频")
- self.action.tap(first_video).perform()
- self.get_single_video_info()
- print("开始刷视频")
- for i in range(50):
- try:
- print(i)
- self.scroll_down()
- self.get_single_video_info()
- except:
- pass
- else:
- continue
- def scroll_down(self):
- """
- 刷视频函数,使用该函数可以往下滑动进入下一个视频
- """
- time.sleep(1)
- width = self.driver.get_window_size()['width'] # 获取屏幕宽
- height = self.driver.get_window_size()['height'] # 获取屏幕高
- # print(width, height)
- self.action.press(x=int(0.5 * width), y=int(0.75 * height))
- self.action.wait(ms=random.randint(200, 400))
- self.action.move_to(x=int(0.5 * width), y=int(0.25 * height))
- self.action.release()
- self.action.perform()
- def get_single_video_info(self):
- try:
- author_name = self.driver.find_element(By.ID, r'com.smile.gifmaker:id/user_name_text_view').text
- except:
- author_name = ""
- try:
- title = self.driver.find_element(By.ID, r'com.smile.gifmaker:id/element_caption_label').text
- except:
- title = ""
- if title and author_name:
- self.name_info_dict[author_name] = title
- def close_spider(self):
- self.driver.quit()
- with open("result.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(self.name_info_dict, ensure_ascii=False, indent=4))
- return self.name_info_dict
- if __name__ == "__main__":
- """
- 抓取的时候,如果遇到正在直播的视频,会很慢,这一点需要考虑优化;
- 现有的author_爬虫长期未维护,存在问题,一直是失效状态
- 2594305039, 2089610315,
- """
- id_list = [1396121077, 1811823755, "lxy20003246"]
- for id in id_list:
- ksc = KuaiShouCollect(id)
- ksc.search_by_id()
- ksc.get_person_info()
- ksc.close_spider()
|