123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2022/10/25
- import os
- import sys
- import time
- from selenium.common import NoSuchElementException
- from selenium.webdriver import DesiredCapabilities, ActionChains
- from selenium.webdriver.chrome.service import Service
- from selenium.webdriver.common.by import By
- from selenium import webdriver
- sys.path.append(os.getcwd())
- from main.common import Common
- from main.feishu_lib import Feishu
- from xinshi.xinshi_app import XinshiAPP
- class XinshiPC:
- @classmethod
- def login(cls, log_type, env):
- # try:
- # 打印请求配置
- ca = DesiredCapabilities.CHROME
- ca["goog:loggingPrefs"] = {"performance": "ALL"}
- # 不打开浏览器运行
- chrome_options = webdriver.ChromeOptions()
- chrome_options.add_argument("headless")
- chrome_options.add_argument(
- f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36')
- chrome_options.add_argument("--no-sandbox")
- # driver初始化
- # Common.logger(log_type).info('初始化 webdriver')
- # driver = webdriver.Chrome(desired_capabilities=ca)
- # driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service('/Users/wangkun/Downloads/chromedriver_v106/chromedriver'))
- driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service('/Users/lieyunye/Downloads/chromedriver_v107/chromedriver'))
- driver.implicitly_wait(10)
- Common.logger(log_type).info('打开网页"新视-热门内容"')
- driver.get('https://xs.newrank.cn/Material/faddish/recentHot')
- driver.maximize_window()
- driver.implicitly_wait(10)
- time.sleep(1)
- Common.logger(log_type).info('点击"登录/按钮"')
- driver.find_element(By.XPATH, '//button[@class="ant-btn ant-btn-primary"]').click()
- time.sleep(1)
- Common.logger(log_type).info('点击"其他登录方式"')
- driver.find_element(By.XPATH, '//span[@class ="_2XRFN1F6"]').click()
- time.sleep(1)
- Common.logger(log_type).info('输入手机号')
- driver.find_element(By.XPATH, '//input[@class="_2DyE0cvF"]').send_keys('13426262515')
- Common.logger(log_type).info('输入密码')
- driver.find_element(By.XPATH, '//input[@placeholder="输入密码"]').send_keys('test111111')
- time.sleep(1)
- Common.logger(log_type).info('勾选"保持登录状态"')
- driver.find_element(By.XPATH, '//input[@class="nrd-login-checkbox-input"]').click()
- time.sleep(1)
- Common.logger(log_type).info('点击"登录"')
- driver.find_element(By.XPATH, '//button[@class="_3RtjFeM- _CH1sF8Xz _38DPDVRd"]').click()
- # 滑块
- try:
- slider = driver.find_element(By.XPATH, '//span[@class="nc_iconfont btn_slide"]')
- slider_full = driver.find_element(By.XPATH, '//div[@class="scale_text slidetounlock"]')
- Common.logger(log_type).info('拖动滑块')
- time.sleep(1)
- """
- 解决特征识别的代码
- script = 'Object.defineProperty(navigator, "webdriver", {get: () => false,});'
- driver.execute_script(script)
- 如果不采取去除特征识别,即以下两行代码。则页面的滑块验证码在滑动后,会显示如下图的出错,从而阻止登录进行。
- 因为服务器识别到的selenium的特征。使用该两行代码更改了特征,即可以顺利通过识别。
- 一般是反爬虫机制,用selenium打开的浏览器,就算手动去滑动都不行。
- """
- script = 'Object.defineProperty(navigator, "webdriver", {get: () => false,});'
- driver.execute_script(script)
- ActionChains(driver).drag_and_drop_by_offset(
- slider, slider_full.size['width'], -slider.size['height']).perform()
- except NoSuchElementException:
- Common.logger(log_type).info('没有滑块')
- pass
- # 登录成功,获取到头像
- time.sleep(3)
- try:
- driver.find_element(By.XPATH, '//img[@class="_J1BGEmMJ"]')
- Common.logger(log_type).info('登录成功\n')
- except NoSuchElementException:
- Common.logger(log_type).info('登录失败,重新登录\n')
- driver.quit()
- cls.login(log_type, env)
- # 获取热门内容
- cls.get_recenhot(log_type, driver, env)
- Common.logger(log_type).info('新视-热门内容抓取完毕\n')
- # 获取十万推荐内容
- cls.get_hundredthousand(log_type, driver, env)
- Common.logger(log_type).info('新视-十万推荐内容抓取完毕\n')
- time.sleep(5)
- Common.logger(log_type).info('退出浏览器\n')
- driver.close()
- driver.quit()
- # except Exception as e:
- # Common.logger(log_type).error('XinshiPC异常,重启浏览器:{}\n', e)
- # cls.login(log_type, env)
- @classmethod
- def get_recenhot(cls, log_type, driver, env):
- time.sleep(3)
- for i in range(1, 21):
- video_title = driver.find_element(
- By.XPATH, '//div[@class="_6gxA3h-x"]/*[' + str(i) + ']//div[@class="_hsgIoLGN"]'
- ).get_attribute('title').strip().replace('"', '') \
- .replace('“', '').replace('“', '…').replace("\n", "") \
- .replace("/", "").replace("\r", "").replace("#", "") \
- .replace(".", "。").replace("\\", "").replace("&NBSP", "") \
- .replace(":", "").replace("*", "").replace("?", "") \
- .replace("?", "").replace('"', "").replace("<", "") \
- .replace(">", "").replace("|", "").replace(" ", "")
- user_name = driver.find_element(
- By.XPATH, '//div[@class="_6gxA3h-x"]/*[' + str(i) + ']//div[@class="_zoylmQ8m"]'
- ).get_attribute('title').replace('\n', '')
- Common.logger(log_type).info(video_title)
- Common.logger(log_type).info(user_name)
- if video_title == '':
- Common.logger(log_type).info('无标题\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'c77cf9') for x in y]:
- Common.logger(log_type).info('视频已下载\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'WAG7Dq') for x in y]:
- Common.logger(log_type).info('视频已下载\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', '0i4jmV') for x in y]:
- Common.logger(log_type).info('视频已下载\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'gO4Sn4') for x in y]:
- Common.logger(log_type).info('视频已存在\n')
- else:
- Feishu.insert_columns(log_type, 'shipinhao', 'gO4Sn4', 'ROWS', 1, 2)
- # 看一看云文档,工作表中写入数据
- values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(int(time.time()))),
- '新视-热门榜',
- video_title,
- user_name]]
- time.sleep(1)
- Feishu.update_values(log_type, 'shipinhao', 'gO4Sn4', 'A2:Z2', values)
- Common.logger(log_type).info('视频信息写入飞书成功\n')
- XinshiAPP.start_wechat(log_type, env)
- @classmethod
- def get_hundredthousand(cls, log_type, driver, env):
- time.sleep(3)
- Common.logger(log_type).info('点击"十万推荐"')
- driver.find_element(By.XPATH, '//div[@class="ant-tabs-nav-list"]/*[2]').click()
- time.sleep(3)
- Common.logger(log_type).info('滚动到页面底部')
- for i in range(5):
- Common.logger(log_type).info('向上滑动页面')
- driver.execute_script("window.scrollBy(0, 3000)")
- time.sleep(1)
- time.sleep(5)
- for i in range(1, 51):
- Common.logger(log_type).info('开始抓取第{}条', i)
- video_title = driver.find_element(
- By.XPATH, '//div[@class="_tCg-GF3J"]/*['+str(i)+']//div[@class="_EmoRHgxz"]'
- ).get_attribute('title').strip().replace('"', '') \
- .replace('“', '').replace('“', '…').replace("\n", "") \
- .replace("/", "").replace("\r", "").replace("#", "") \
- .replace(".", "。").replace("\\", "").replace("&NBSP", "") \
- .replace(":", "").replace("*", "").replace("?", "") \
- .replace("?", "").replace('"', "").replace("<", "") \
- .replace(">", "").replace("|", "").replace(" ", "")
- user_name = driver.find_element(
- By.XPATH, '//div[@class="_tCg-GF3J"]/*['+str(i)+']//div[@class="_gD23uy8R"]'
- ).get_attribute('title').replace('\n', '')
- Common.logger(log_type).info(video_title)
- Common.logger(log_type).info(user_name)
- if video_title == '':
- Common.logger(log_type).info('无标题\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'c77cf9') for x in y]:
- Common.logger(log_type).info('视频已下载\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'WAG7Dq') for x in y]:
- Common.logger(log_type).info('视频已下载\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', '0i4jmV') for x in y]:
- Common.logger(log_type).info('视频已下载\n')
- elif video_title in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'aOjaIU') for x in y]:
- Common.logger(log_type).info('视频已存在\n')
- else:
- Feishu.insert_columns(log_type, 'shipinhao', 'aOjaIU', 'ROWS', 1, 2)
- # 看一看云文档,工作表中写入数据
- values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(int(time.time()))),
- '新视-推荐榜',
- video_title,
- user_name]]
- time.sleep(1)
- Feishu.update_values(log_type, 'shipinhao', 'aOjaIU', 'A2:Z2', values)
- Common.logger(log_type).info('视频信息写入飞书成功\n')
- XinshiAPP.start_wechat(log_type, env)
- if __name__ == '__main__':
- XinshiPC.login('xinshi', 'dev')
- pass
|