# -*- coding: utf-8 -*- import json import os import random import subprocess import sys import time import uuid from datetime import datetime import requests from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.common.touch_action import TouchAction from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By from application.common import MysqlHelper, Feishu sys.path.append(os.getcwd()) class XiaoNianGaoZH(object): """ 小年糕+线下爬虫 """ def __init__(self): mid = 1160417293 current_time = datetime.now() formatted_time = current_time.strftime("%Y%m%d") date_int = int(formatted_time) # 获取时间标签 tag_id = self.get_tag_id(date_int) print(tag_id) # 新增账号 pq_uid = self.insert_number(mid, tag_id) self.count = 0 self.swipe_count = 0 chromedriverExecutable = "/Users/tzld/Downloads/chromedriver-mac-x64/chromedriver" print("启动微信") # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": True, "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except Exception as e: print(e) return self.driver.implicitly_wait(30) for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): print("启动微信成功") break elif self.driver.find_element( By.ID, "com.android.systemui:id/dismiss_view" ): print("发现并关闭系统下拉菜单") size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.2), 200, ) else: pass except Exception as e: print(f"打开微信异常:{e}") time.sleep(1) size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.2), int(size["width"] * 0.5), int(size["height"] * 0.8), 200, ) time.sleep(1) command = 'adb shell service call statusbar 2' process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) process.communicate() self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click() print("打开小程序小年糕+成功") time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) print("切换到WebView成功\n") return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() action = TouchAction(self.driver) action.press(x=int(size["width"] * 0.5), y=int(size["height"] * 0.85)) action.wait(ms=1300) # 可以调整等待时间 action.move_to(x=int(size["width"] * 0.5), y=int(size["height"] * 0.2)) action.release() action.perform() self.swipe_count += 1 def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="list-list--list"]') time.sleep(1) self.driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0], ) time.sleep(3) video_title_element[0].click() self.check_to_applet( xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) time.sleep(10) video_url_elements = self.search_elements( '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) return video_url_elements[0].get_attribute("src") def parse_detail(self, index): page_source = self.driver.page_source soup = BeautifulSoup(page_source, "html.parser") soup.prettify() video_list = soup.findAll( name="wx-view", attrs={"class": "expose--adapt-parent"} ) index = index + 1 element_list = [i for i in video_list][index:] return element_list[0] def get_video_info_2(self, video_element): self.count += 1 video_title = video_element.find("wx-view", class_="dynamic--title").text # 头像 URL avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] # 用户名称 user_name = video_element.find("wx-view", class_="dynamic--nick-top").text name_url = self.select_name_url(avatar_url, user_name) if name_url: video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]') if video_title_element is None: return self.get_video_url(video_title_element) video_mid_elements = self.search_elements("//wx-view[@class='bar--navBar-content-capsule-wrap']") mid = int(video_mid_elements[0].get_attribute("data-mid")) self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) uid = self.select_id(mid) if uid: self.update_name_url(mid, avatar_url, user_name) else: time.sleep(1) link = self.select_id_status(mid) if link: current_time = datetime.now() formatted_time = current_time.strftime("%Y%m%d") date_int = int(formatted_time) # 获取时间标签 tag_id = self.get_tag_id(date_int) time.sleep(5) print(tag_id) # 新增账号 pq_uid = self.insert_number(mid, tag_id) time.sleep(5) if pq_uid: self.insert_name_url(mid, avatar_url, user_name) # 获取当前时间 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [[ str(mid), user_name, avatar_url, str(pq_uid), formatted_time, ]] Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "8zlceR", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values('xiaoniangao', 'xiaoniangao', "8zlceR", "A2:Z2", values) print("写入飞书表格成功") def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: print(f"抓取单条视频异常:{e}\n") def get_videoList(self): """ 获取视频列表 :return: """ # while True: self.driver.implicitly_wait(20) # 切换到 web_view self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]') print("切换到 webview 成功") time.sleep(1) if self.search_elements('//*[@class="list-list--list"]') is None: print("窗口已销毁") self.count = 0 self.download_cnt = 0 self.element_list = [] return print("开始获取视频信息") for i in range(50): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) self.swipe_up() time.sleep(random.randint(1, 5)) def insert_number(self, mid, tag_id): for i in range(3): url = "https://admin.piaoquantv.com/manager/crawler/v3/user/save" payload = { "source": "xiaoniangao", "mode": "author", "modeValue": "", "modeBoard": "", "recomStatus": -6, "appRecomStatus": -6, "autoAuditStatus": 0, "tag": f"459,454,106,8240,{int(tag_id)}", "contentCategory": 0, "link": str(mid) } headers = { 'content-length': '0', 'cookie': 'SESSION=MWM4YzVlMTctNzdkNC00NjE3LWIxZTctOGQwYzgzYmVmN2Qw', 'origin': 'https://admin.piaoquantv.com', 'priority': 'u=1, i', 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } response = requests.request("POST", url, headers=headers, json=payload) response = response.json() code = response["code"] if code == 0: print("添加账号成功") time.sleep(1) url = "https://admin.piaoquantv.com/manager/crawler/v3/user/list" payload = { "pageNum": 1, "pageSize": 20 } response = requests.request("POST", url, headers=headers, json=payload) response = response.json() list = response["content"]['list'] link = list[0]["link"] if link == str(mid): print("获取站内账号ID成功") return list[0]["uid"] """ 查询用户名+头像是否存在 """ def select_name_url(self, avatar_url, user_name): sql = f""" select uid from xng_uid where avatar_url = "{avatar_url}" and user_name="{user_name}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return False return True def get_tag_id(self, date_int): for i in range(3): url = f"https://admin.piaoquantv.com/manager/user/up/searchUserTypeTag?keyword={date_int}&muid=7" payload = {} headers = { 'content-length': '0', 'cookie': 'SESSION=MWM4YzVlMTctNzdkNC00NjE3LWIxZTctOGQwYzgzYmVmN2Qw', 'origin': 'https://admin.piaoquantv.com', 'priority': 'u=1, i', 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() content = response["content"] if content: tagId = content[0]['tagId'] return tagId else: url = f"https://admin.piaoquantv.com/manager/user/up/createUserTypeTag?tagName={date_int}&muid=7" response = requests.request("POST", url, headers=headers, data=payload) response = response.json() content = response["content"] if content: tagId = content['tagId'] return tagId """ 修改用户名+头像 """ def update_name_url(self, mid, avatar_url, user_name): sql = f""" update xng_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{mid}"; """ db = MysqlHelper() repeat_video = db.update(sql=sql) if repeat_video: return True return False """ 插入 用户名 头像 用户id """ def insert_name_url(self, uid, avatar_url, user_name): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO xng_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')""" db = MysqlHelper() repeat_video = db.update(sql=insert_sql) if repeat_video: return True return False """ 查询用户id是否存在 """ def select_id(self, uid): sql = f""" select uid from xng_uid where uid = "{uid}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return True return False """ 查询用户id是否之前已添加过 """ def select_id_status(self, uid): sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return False return True if __name__ == "__main__": XiaoNianGaoZH()