import json import os import random import time from appium import webdriver from appium.webdriver.common.touch_action import TouchAction from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By class KuaiShouCollect: # 初始化appium def __init__(self, user_id, log_type, crawler, env): self.desired_caps = { "platformName": "Android", "deviceName": "AQQB9X3211W00486", "appPackage": "com.smile.gifmaker", "appActivity": "com.yxcorp.gifshow.HomeActivity", "noReset": True, "automationName": "UiAutomator2", } self.driver = webdriver.Remote( "http://localhost:4723/wd/hub", self.desired_caps ) self.driver.implicitly_wait(10) self.action = TouchAction(self.driver) if os.path.exists("result.json"): with open("result.json", "r", encoding="utf-8") as f: self.name_info_dict = json.loads(f.read()) else: self.name_info_dict = {} self.user_id = user_id self.loge_type = log_type self.crawler = crawler self.env = env def search_by_id(self): # 搜索 找到搜索按钮并且点击 search_button = WebDriverWait(self.driver, 20).until( EC.element_to_be_clickable((By.ID, r'com.smile.gifmaker:id/nasa_featured_default_search_view')) ) print("找到了搜索键") if search_button: # action = TouchAction(self.driver) self.action.tap(search_button).perform() else: print("can not find search button") return # 找到搜索栏,并且输入keywords search_bar = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.ID, r'com.smile.gifmaker:id/editor')) ) if search_bar: search_bar.send_keys(self.user_id) # 点击搜索 self.driver.find_element(By.ID, r"com.smile.gifmaker:id/right_tv").click() print("搜索完成") else: print("fails in input keywords") return def get_person_info(self): """ 点击进入用户主页 查看是否存在收藏列表,若存在收藏列表,则采集该用户的收藏列表,若不存在,则直接跳过,退出,重新搜索下一个ID """ # 找到头像,点击进入 avatar = WebDriverWait(self.driver, 20).until( EC.presence_of_element_located((By.XPATH, r'//androidx.recyclerview.widget.RecyclerView[@resource-id="com.smile.gifmaker:id/recycler_view"]/android.view.ViewGroup[2]')) ) # self.driver.find_element.click() self.action.tap(avatar).perform() print("进入详情页") # 找到个人详情下面的数据,看看是否存在收藏 person_tab_list = self.driver.find_elements( By.ID, r"com.smile.gifmaker:id/tab_text" ) time.sleep(10) for tab in person_tab_list: print(tab.text) if "收藏" in tab.text: # print(tab.text) self.action.tap(tab).perform() time.sleep(10) first_video = self.driver.find_element(By.XPATH, r'//android.widget.ImageView[@content-desc="作品"]') print("找到了第一条视频") self.action.tap(first_video).perform() self.get_single_video_info() print("开始刷视频") for i in range(50): try: print(i) self.scroll_down() self.get_single_video_info() except: pass else: continue def scroll_down(self): """ 刷视频函数,使用该函数可以往下滑动进入下一个视频 """ time.sleep(1) width = self.driver.get_window_size()['width'] # 获取屏幕宽 height = self.driver.get_window_size()['height'] # 获取屏幕高 # print(width, height) self.action.press(x=int(0.5 * width), y=int(0.75 * height)) self.action.wait(ms=random.randint(200, 400)) self.action.move_to(x=int(0.5 * width), y=int(0.25 * height)) self.action.release() self.action.perform() def get_single_video_info(self): try: author_name = self.driver.find_element(By.ID, r'com.smile.gifmaker:id/user_name_text_view').text except: author_name = "" try: title = self.driver.find_element(By.ID, r'com.smile.gifmaker:id/element_caption_label').text except: title = "" if title and author_name: self.name_info_dict[author_name] = title def close_spider(self): self.driver.quit() with open("result.json", "w", encoding="utf-8") as f: f.write(json.dumps(self.name_info_dict, ensure_ascii=False, indent=4)) return self.name_info_dict if __name__ == "__main__": """ 抓取的时候,如果遇到正在直播的视频,会很慢,这一点需要考虑优化; 现有的author_爬虫长期未维护,存在问题,一直是失效状态 2594305039, 2089610315, """ id_list = [1396121077, 1811823755, "lxy20003246"] for id in id_list: ksc = KuaiShouCollect(id) ksc.search_by_id() ksc.get_person_info() ksc.close_spider()