123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/7/6
- import os
- import random
- import sys
- import time
- import cv2
- import requests
- import urllib3
- from selenium.webdriver import DesiredCapabilities, ActionChains
- from selenium import webdriver
- from selenium.webdriver.chrome.service import Service
- from selenium.webdriver.common.by import By
- sys.path.append(os.getcwd())
- from common.common import Common
- class XGRecommend(object):
- def __init__(self, log_type, crawler, env):
- """
- 本地启动 Chrome,指定端口号:12306
- open -a "Google Chrome" --args --remote-debugging-port=12306
- """
- # Common.logger(log_type, crawler).info("启动 Chrome 浏览器")
- # cmd = 'open -a "Google Chrome" --args --remote-debugging-port=12306'
- # os.system(cmd)
- if env == "dev":
- chromedriver = "/Users/wangkun/Downloads/chromedriver/chromedriver_v114/chromedriver"
- else:
- chromedriver = "/usr/bin/chromedriver"
- # 打印请求配置
- ca = DesiredCapabilities.CHROME
- ca["goog:loggingPrefs"] = {"performance": "ALL"}
- # 初始化浏览器
- self.browser = webdriver.ChromeOptions()
- # 设置user-agent
- self.browser.add_argument(
- f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36')
- # 去掉提示:Chrome正收到自动测试软件的控制
- self.browser.add_argument('--disable-infobars')
- # 禁用GPU加速
- self.browser.add_argument('--disable-gpu')
- # 关闭开发者模式
- self.browser.add_experimental_option("useAutomationExtension", False)
- # 以键值对的形式加入参数
- self.browser.add_experimental_option('excludeSwitches', ['enable-automation'])
- # 禁用启用Blink运行时的功能
- self.browser.add_argument('--disable-blink-features=AutomationControlled')
- # 不打开浏览器运行
- # self.browser.add_argument("--headless")
- # linux 环境中,静默模式必须要加的参数
- # self.browser.add_argument("--no-sandbox")
- # 设置浏览器size
- self.browser.add_argument("--window-size=1920,1080")
- # driver初始化
- self.driver = webdriver.Chrome(desired_capabilities=ca, options=self.browser, service=Service(chromedriver))
- self.driver.implicitly_wait(10)
- Common.logger(log_type, crawler).info("打开西瓜推荐页")
- self.driver.get(f"https://www.ixigua.com/")
- self.username = "19831265541"
- self.password = "Test111111"
- time.sleep(2)
- def quit(self, log_type, crawler):
- Common.logger(log_type, crawler).info("退出浏览器")
- self.driver.quit()
- # 返回两个数组:一个用于加速拖动滑块,一个用于减速拖动滑块
- @staticmethod
- def generate_tracks(distance):
- # 给距离加上20,这20像素用在滑块滑过缺口后,减速折返回到缺口
- distance += 20
- v = 0
- t = 0.2
- forward_tracks = []
- current = 0
- mid = distance * 3 / 5 # 减速阀值
- while current < distance:
- if current < mid:
- a = 2 # 加速度为+2
- else:
- a = -3 # 加速度-3
- s = v * t + 0.5 * a * (t ** 2)
- v = v + a * t
- current += s
- forward_tracks.append(round(s))
- back_tracks = [-3, -3, -2, -2, -2, -2, -2, -1, -1, -1, -1]
- return forward_tracks, back_tracks
- # 获取距离值
- @staticmethod
- def get_tracks(distance):
- """
- 模拟人的滑动行为,先匀加速后匀减速
- 匀变速基本公式
- v=v0+at
- s=vot+1/2at2
- """
- # 初速度
- v = 0
- # 设置时间
- t = 0.3
- # 存储每段距离值
- tracks = []
- # 当前距离
- current = 0
- # 中间位置为4/5距离处
- mid = distance * 4 / 5
- while current < distance:
- if current < mid:
- # 加速阶段
- a = 2
- else:
- # 减速阶段
- a = -3
- # 当前速度
- v0 = v
- # 当前位移
- s = v0 * t + 0.5 * a * t ** 2
- # 更新当前速度
- v = v0 + a * t
- # 更新当前位移
- current += s
- # 添加到轨迹列表
- tracks.append(round(s))
- return tracks
- @staticmethod
- def FindPic(log_type, crawler, target, template):
- """
- 找出图像中最佳匹配位置
- :param log_type: 日志
- :param crawler: 爬虫
- :param target: 目标即背景图
- :param template: 模板即需要找到的图
- :return: 返回最佳匹配及其最差匹配和对应的坐标
- """
- target_rgb = cv2.imread(target)
- target_gray = cv2.cvtColor(target_rgb, cv2.COLOR_BGR2GRAY)
- template_rgb = cv2.imread(template, 0)
- res = cv2.matchTemplate(target_gray, template_rgb, cv2.TM_CCOEFF_NORMED)
- value = cv2.minMaxLoc(res)
- Common.logger(log_type, crawler).info(value)
- # 计算缺口的 X 轴距离
- x_val = int(value[3][0])
- Common.logger(log_type, crawler).info(f"缺口的 X 轴距离:{x_val}")
- # 获取模板图的宽高
- template_height, template_width, template_c = cv2.imread(template).shape
- Common.logger(log_type, crawler).info(f"模板高:{template_height}")
- Common.logger(log_type, crawler).info(f"模板宽:{template_width}")
- Common.logger(log_type, crawler).info(f"图片的通道数:{template_c}")
- # 计算需要滑动的距离
- move_val = x_val - template_width
- Common.logger(log_type, crawler).info(f"需要滑动的距离:{move_val}")
- return x_val
- def login(self, log_type, crawler, env):
- Common.logger(log_type, crawler).info("点击登录")
- self.driver.find_element(By.XPATH, '//*[@class="xg-button xg-button-primary xg-button-middle loginButton"]').click()
- time.sleep(random.randint(1, 2))
- Common.logger(log_type, crawler).info("点击密码登录")
- self.driver.find_element(By.XPATH, '//*[@class="web-login-link-list__item__text"]').click()
- time.sleep(random.randint(1, 2))
- Common.logger(log_type, crawler).info("输入手机号")
- self.driver.find_element(By.XPATH, '//*[@class="web-login-normal-input__input"]').send_keys(self.username)
- time.sleep(random.randint(1, 2))
- Common.logger(log_type, crawler).info("输入密码")
- self.driver.find_element(By.XPATH, '//*[@class="web-login-button-input__input"]').send_keys(self.password)
- time.sleep(random.randint(1, 2))
- Common.logger(log_type, crawler).info("点击登录")
- self.driver.find_element(By.XPATH, '//*[@class="web-login-account-password__button-wrapper"]/*[1]').click()
- time.sleep(random.randint(1, 2))
- # 获取滑块
- Common.logger(log_type, crawler).info("获取滑块")
- move_btns = self.driver.find_elements(By.XPATH, '//*[@class="sc-kkGfuU bujTgx"]')
- if len(move_btns) == 0:
- Common.logger(log_type, crawler).info("未发现滑块,3-5 秒后重试")
- self.quit(log_type, crawler)
- time.sleep(random.randint(3, 5))
- self.__init__(log_type, crawler, env)
- self.login(log_type, crawler, env)
- move_btn = move_btns[0]
- while True:
- # 使用requests下载滑块
- slide_url = self.driver.find_element(By.XPATH,
- '//*[@class="captcha_verify_img_slide react-draggable sc-VigVT ggNWOG"]').get_attribute(
- "src")
- slide_dir = f"./{crawler}/photo/img_slide.png"
- urllib3.disable_warnings()
- slide_url_response = requests.get(slide_url, verify=False)
- with open(slide_dir, "wb") as file:
- file.write(slide_url_response.content)
- # 使用urllib下载背景图
- bg_image_url = self.driver.find_element(By.XPATH, '//*[@id="captcha-verify-image"]').get_attribute("src")
- bg_image_dir = f"./{crawler}/photo/img_bg.png"
- urllib3.disable_warnings()
- bg_image_url_response = requests.get(bg_image_url, verify=False)
- with open(bg_image_dir, "wb") as file:
- file.write(bg_image_url_response.content)
- offset = self.FindPic(log_type, crawler, bg_image_dir, slide_dir)
- Common.logger(log_type, crawler).info(f"offset:{offset}")
- # 在滑块上暂停
- Common.logger(log_type, crawler).info("在滑块上暂停")
- ActionChains(self.driver).click_and_hold(on_element=move_btn).perform()
- # 拖动滑块
- Common.logger(log_type, crawler).info("拖动滑块0.7*距离")
- ActionChains(self.driver).move_to_element_with_offset(to_element=move_btn, xoffset=int(0.5*offset), yoffset=0).perform()
- # 拖动剩余像素
- Common.logger(log_type, crawler).info("拖动剩余像素")
- tracks = self.get_tracks(int(0.15*offset))
- # 遍历梅一段距离
- for track in tracks:
- # 滑块移动响应距离
- ActionChains(self.driver).move_by_offset(xoffset=track, yoffset=0).perform()
- # 休息1s
- Common.logger(log_type, crawler).info("休息1s")
- time.sleep(1)
- # 释放滑块
- Common.logger(log_type, crawler).info("释放滑块")
- ActionChains(self.driver).release().perform()
- if len(move_btns) != 0:
- time.sleep(1)
- continue
- break
- time.sleep(5)
- Common.logger(log_type, crawler).info("退出浏览器")
- self.quit(log_type, crawler)
- if __name__ == "__main__":
- Recommend = XGRecommend("search", "dev", "dev")
- Recommend.login("search", "dev", "dev")
- pass
|