# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/7/6 import os import random import sys import time import cv2 import requests import urllib3 from selenium.webdriver import DesiredCapabilities, ActionChains from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.common import Common class XGRecommend(object): def __init__(self, log_type, crawler, env): """ 本地启动 Chrome,指定端口号:12306 open -a "Google Chrome" --args --remote-debugging-port=12306 """ # Common.logger(log_type, crawler).info("启动 Chrome 浏览器") # cmd = 'open -a "Google Chrome" --args --remote-debugging-port=12306' # os.system(cmd) if env == "dev": chromedriver = "/Users/wangkun/Downloads/chromedriver/chromedriver_v114/chromedriver" else: chromedriver = "/usr/bin/chromedriver" # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 初始化浏览器 self.browser = webdriver.ChromeOptions() # 设置user-agent self.browser.add_argument( f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') # 去掉提示:Chrome正收到自动测试软件的控制 self.browser.add_argument('--disable-infobars') # 禁用GPU加速 self.browser.add_argument('--disable-gpu') # 关闭开发者模式 self.browser.add_experimental_option("useAutomationExtension", False) # 以键值对的形式加入参数 self.browser.add_experimental_option('excludeSwitches', ['enable-automation']) # 禁用启用Blink运行时的功能 self.browser.add_argument('--disable-blink-features=AutomationControlled') # 不打开浏览器运行 # self.browser.add_argument("--headless") # linux 环境中,静默模式必须要加的参数 # self.browser.add_argument("--no-sandbox") # 设置浏览器size self.browser.add_argument("--window-size=1920,1080") # driver初始化 self.driver = webdriver.Chrome(desired_capabilities=ca, options=self.browser, service=Service(chromedriver)) self.driver.implicitly_wait(10) Common.logger(log_type, crawler).info("打开西瓜推荐页") self.driver.get(f"https://www.ixigua.com/") self.username = "19831265541" self.password = "Test111111" time.sleep(2) def quit(self, log_type, crawler): Common.logger(log_type, crawler).info("退出浏览器") self.driver.quit() # 返回两个数组:一个用于加速拖动滑块,一个用于减速拖动滑块 @staticmethod def generate_tracks(distance): # 给距离加上20,这20像素用在滑块滑过缺口后,减速折返回到缺口 distance += 20 v = 0 t = 0.2 forward_tracks = [] current = 0 mid = distance * 3 / 5 # 减速阀值 while current < distance: if current < mid: a = 2 # 加速度为+2 else: a = -3 # 加速度-3 s = v * t + 0.5 * a * (t ** 2) v = v + a * t current += s forward_tracks.append(round(s)) back_tracks = [-3, -3, -2, -2, -2, -2, -2, -1, -1, -1, -1] return forward_tracks, back_tracks # 获取距离值 @staticmethod def get_tracks(distance): """ 模拟人的滑动行为,先匀加速后匀减速 匀变速基本公式 v=v0+at s=vot+1/2at2 """ # 初速度 v = 0 # 设置时间 t = 0.3 # 存储每段距离值 tracks = [] # 当前距离 current = 0 # 中间位置为4/5距离处 mid = distance * 4 / 5 while current < distance: if current < mid: # 加速阶段 a = 2 else: # 减速阶段 a = -3 # 当前速度 v0 = v # 当前位移 s = v0 * t + 0.5 * a * t ** 2 # 更新当前速度 v = v0 + a * t # 更新当前位移 current += s # 添加到轨迹列表 tracks.append(round(s)) return tracks @staticmethod def FindPic(log_type, crawler, target, template): """ 找出图像中最佳匹配位置 :param log_type: 日志 :param crawler: 爬虫 :param target: 目标即背景图 :param template: 模板即需要找到的图 :return: 返回最佳匹配及其最差匹配和对应的坐标 """ target_rgb = cv2.imread(target) target_gray = cv2.cvtColor(target_rgb, cv2.COLOR_BGR2GRAY) template_rgb = cv2.imread(template, 0) res = cv2.matchTemplate(target_gray, template_rgb, cv2.TM_CCOEFF_NORMED) value = cv2.minMaxLoc(res) Common.logger(log_type, crawler).info(value) # 计算缺口的 X 轴距离 x_val = int(value[3][0]) Common.logger(log_type, crawler).info(f"缺口的 X 轴距离:{x_val}") # 获取模板图的宽高 template_height, template_width, template_c = cv2.imread(template).shape Common.logger(log_type, crawler).info(f"模板高:{template_height}") Common.logger(log_type, crawler).info(f"模板宽:{template_width}") Common.logger(log_type, crawler).info(f"图片的通道数:{template_c}") # 计算需要滑动的距离 move_val = x_val - template_width Common.logger(log_type, crawler).info(f"需要滑动的距离:{move_val}") return x_val def login(self, log_type, crawler, env): Common.logger(log_type, crawler).info("点击登录") self.driver.find_element(By.XPATH, '//*[@class="xg-button xg-button-primary xg-button-middle loginButton"]').click() time.sleep(random.randint(1, 2)) Common.logger(log_type, crawler).info("点击密码登录") self.driver.find_element(By.XPATH, '//*[@class="web-login-link-list__item__text"]').click() time.sleep(random.randint(1, 2)) Common.logger(log_type, crawler).info("输入手机号") self.driver.find_element(By.XPATH, '//*[@class="web-login-normal-input__input"]').send_keys(self.username) time.sleep(random.randint(1, 2)) Common.logger(log_type, crawler).info("输入密码") self.driver.find_element(By.XPATH, '//*[@class="web-login-button-input__input"]').send_keys(self.password) time.sleep(random.randint(1, 2)) Common.logger(log_type, crawler).info("点击登录") self.driver.find_element(By.XPATH, '//*[@class="web-login-account-password__button-wrapper"]/*[1]').click() time.sleep(random.randint(1, 2)) # 获取滑块 Common.logger(log_type, crawler).info("获取滑块") move_btns = self.driver.find_elements(By.XPATH, '//*[@class="sc-kkGfuU bujTgx"]') if len(move_btns) == 0: Common.logger(log_type, crawler).info("未发现滑块,3-5 秒后重试") self.quit(log_type, crawler) time.sleep(random.randint(3, 5)) self.__init__(log_type, crawler, env) self.login(log_type, crawler, env) move_btn = move_btns[0] while True: # 使用requests下载滑块 slide_url = self.driver.find_element(By.XPATH, '//*[@class="captcha_verify_img_slide react-draggable sc-VigVT ggNWOG"]').get_attribute( "src") slide_dir = f"./{crawler}/photo/img_slide.png" urllib3.disable_warnings() slide_url_response = requests.get(slide_url, verify=False) with open(slide_dir, "wb") as file: file.write(slide_url_response.content) # 使用urllib下载背景图 bg_image_url = self.driver.find_element(By.XPATH, '//*[@id="captcha-verify-image"]').get_attribute("src") bg_image_dir = f"./{crawler}/photo/img_bg.png" urllib3.disable_warnings() bg_image_url_response = requests.get(bg_image_url, verify=False) with open(bg_image_dir, "wb") as file: file.write(bg_image_url_response.content) offset = self.FindPic(log_type, crawler, bg_image_dir, slide_dir) Common.logger(log_type, crawler).info(f"offset:{offset}") # 在滑块上暂停 Common.logger(log_type, crawler).info("在滑块上暂停") ActionChains(self.driver).click_and_hold(on_element=move_btn).perform() # 拖动滑块 Common.logger(log_type, crawler).info("拖动滑块0.7*距离") ActionChains(self.driver).move_to_element_with_offset(to_element=move_btn, xoffset=int(0.5*offset), yoffset=0).perform() # 拖动剩余像素 Common.logger(log_type, crawler).info("拖动剩余像素") tracks = self.get_tracks(int(0.15*offset)) # 遍历梅一段距离 for track in tracks: # 滑块移动响应距离 ActionChains(self.driver).move_by_offset(xoffset=track, yoffset=0).perform() # 休息1s Common.logger(log_type, crawler).info("休息1s") time.sleep(1) # 释放滑块 Common.logger(log_type, crawler).info("释放滑块") ActionChains(self.driver).release().perform() if len(move_btns) != 0: time.sleep(1) continue break time.sleep(5) Common.logger(log_type, crawler).info("退出浏览器") self.quit(log_type, crawler) if __name__ == "__main__": Recommend = XGRecommend("search", "dev", "dev") Recommend.login("search", "dev", "dev") pass