# -*- coding: utf-8 -*- import json import os import random import subprocess import sys import time import uuid import requests from datetime import datetime, timedelta from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.common.touch_action import TouchAction from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from application.common import MysqlHelper, Feishu class XiaoNianGaoZH(object): """ 小年糕+线下爬虫 """ def __init__(self): self.count = 0 self.swipe_count = 0 chromedriverExecutable = "/Users/a123456/Downloads/chromedriver-mac-x64/chromedriver" print("启动微信") # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": True, "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except Exception as e: print(e) return self.driver.implicitly_wait(30) for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): print("启动微信成功") break elif self.driver.find_element( By.ID, "com.android.systemui:id/dismiss_view" ): print("发现并关闭系统下拉菜单") size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.2), 200, ) else: pass except Exception as e: print(f"打开微信异常:{e}") time.sleep(1) size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.2), int(size["width"] * 0.5), int(size["height"] * 0.8), 200, ) time.sleep(1) command = 'adb shell service call statusbar 2' process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) process.communicate() self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click() print("打开小程序小年糕+成功") time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def save_pq_uid(self, uid): current_time = datetime.now() time_after_10_minutes = current_time + timedelta(minutes=5) # 获取时间戳 timestamp_seconds = time_after_10_minutes.timestamp() timestamp_milliseconds = int(timestamp_seconds * 1000) file_path = '/Users/tzld/Desktop/automatic_crawler/spider/crawler_offline/xng_zh.txt' with open(file_path, 'r') as file: content = file.read() if content and content[-1] != ',': uid = ',' + str(uid) with open(file_path, 'a') as file: file.write(uid) with open(file_path, 'r') as file: content = file.read() url = "https://admin.piaoquantv.com/manager/crawler/v3/task/save?muid=999" payload = { "taskName": "小年糕账号", "source": "xiaoniangao", "mode": "author", "modeValue": "0", "modeBoard": "0", "spiderName": "run_xng_author", "startTime": timestamp_milliseconds, "interval": 4800, "uid": str(content), "machine": "aliyun", "rule": [{"period": {"min": 15, "max": 3}}, {"duration": {"min": 50, "max": 0}}, {"share_cnt": {"min": 2, "max": 0}}, {"videos_cnt": {"min": 300, "max": 0}}], "id": 21 } headers = { 'accept': 'application/json', 'content-type': 'application/json;', 'cookie': 'SESSION=YjM4YmE5NDgtMjJmNi00NjA1LTgyNDUtYTNlZGVlOGNmODMy', 'origin': 'https://admin.piaoquantv.com', 'pragma': 'no-cache', 'priority': 'u=1, i', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' } requests.request("POST", url, headers=headers, json=payload) def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) print("切换到WebView成功\n") return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() action = TouchAction(self.driver) action.press(x=int(size["width"] * 0.5), y=int(size["height"] * 0.85)) action.wait(ms=1300) # 可以调整等待时间 action.move_to(x=int(size["width"] * 0.5), y=int(size["height"] * 0.2)) action.release() action.perform() self.swipe_count += 1 def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="list-list--list"]') time.sleep(1) self.driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0], ) time.sleep(3) video_title_element[0].click() self.check_to_applet( xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) time.sleep(10) video_url_elements = self.search_elements( '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) return video_url_elements[0].get_attribute("src") def parse_detail(self, index): page_source = self.driver.page_source soup = BeautifulSoup(page_source, "html.parser") soup.prettify() video_list = soup.findAll( name="wx-view", attrs={"class": "expose--adapt-parent"} ) index = index + 1 element_list = [i for i in video_list][index:] return element_list[0] def get_video_info_2(self, video_element): self.count += 1 video_title = video_element.find("wx-view", class_="dynamic--title").text # 头像 URL avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] # 用户名称 user_name = video_element.find("wx-view", class_="dynamic--nick-top").text name_url = self.select_name_url(avatar_url, user_name) if name_url: video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]') if video_title_element is None: return self.get_video_url(video_title_element) video_mid_elements = self.search_elements("//wx-view[@class='bar--navBar-content-capsule-wrap']") mid = int(video_mid_elements[0].get_attribute("data-mid")) self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) uid = self.select_id(mid) if uid: self.update_name_url(mid, avatar_url, user_name) else: time.sleep(1) link = self.select_id_status(mid) if link: current_time = datetime.now() formatted_time = current_time.strftime("%Y%m%d") date_int = int(formatted_time) # 获取时间标签 tag_id = self.get_tag_id(date_int) time.sleep(5) print(tag_id) # 新增账号 pq_uid = self.insert_number(mid, tag_id) time.sleep(5) if pq_uid: self.insert_name_url(mid, avatar_url, user_name) time.sleep(2) self.save_pq_uid(pq_uid) # 获取当前时间 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [[ str(mid), user_name, avatar_url, str(pq_uid), formatted_time, ]] Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "8zlceR", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values('xiaoniangao', 'xiaoniangao', "8zlceR", "A2:Z2", values) print("写入飞书表格成功") def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: print(f"抓取单条视频异常:{e}\n") def get_videoList(self): """ 获取视频列表 :return: """ # while True: self.driver.implicitly_wait(20) # 切换到 web_view self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]') print("切换到 webview 成功") time.sleep(1) if self.search_elements('//*[@class="list-list--list"]') is None: print("窗口已销毁") self.count = 0 self.download_cnt = 0 self.element_list = [] return print("开始获取视频信息") for i in range(50): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) self.swipe_up() time.sleep(random.randint(1, 5)) def insert_number(self, mid, tag_id): for i in range(3): url = "https://admin.piaoquantv.com/manager/crawler/v3/user/save?muid=999" payload = { "source": "xiaoniangao", "mode": "author", "modeValue": "", "modeBoard": "", "recomStatus": -6, "appRecomStatus": -6, "autoAuditStatus": 0, "tag": f"459,454,106,8240,{int(tag_id)}", "contentCategory": 0, "link": str(mid) } headers = { 'content-length': '0', 'cookie': 'SESSION=YjM4YmE5NDgtMjJmNi00NjA1LTgyNDUtYTNlZGVlOGNmODMy', 'origin': 'https://admin.piaoquantv.com', 'priority': 'u=1, i', 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } response = requests.request("POST", url, headers=headers, json=payload) response = response.json() code = response["code"] if code == 0: print("添加账号成功") time.sleep(1) url = "https://admin.piaoquantv.com/manager/crawler/v3/user/list?muid=999" payload = { "pageNum": 1, "pageSize": 20 } response = requests.request("POST", url, headers=headers, json=payload) response = response.json() list = response["content"]['list'] link = list[0]["link"] if link == str(mid): print("获取站内账号ID成功") return list[0]["uid"] """ 查询用户名+头像是否存在 """ def select_name_url(self, avatar_url, user_name): sql = f""" select uid from xng_uid where avatar_url = "{avatar_url}" and user_name="{user_name}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return False return True def get_tag_id(self, date_int): for i in range(3): url = f"https://admin.piaoquantv.com/manager/user/up/searchUserTypeTag?keyword={date_int}&muid=7?muid=999" payload = {} headers = { 'content-length': '0', 'cookie': 'SESSION=YjM4YmE5NDgtMjJmNi00NjA1LTgyNDUtYTNlZGVlOGNmODMy', 'origin': 'https://admin.piaoquantv.com', 'priority': 'u=1, i', 'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() content = response["content"] if content: tagId = content[0]['tagId'] return tagId else: url = f"https://admin.piaoquantv.com/manager/user/up/createUserTypeTag?tagName={date_int}&muid=7?muid=999" response = requests.request("POST", url, headers=headers, data=payload) response = response.json() content = response["content"] if content: tagId = content['tagId'] return tagId """ 修改用户名+头像 """ def update_name_url(self, mid, avatar_url, user_name): sql = f""" update xng_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{mid}"; """ db = MysqlHelper() repeat_video = db.update(sql=sql) if repeat_video: return True return False """ 插入 用户名 头像 用户id """ def insert_name_url(self, uid, avatar_url, user_name): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO xng_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')""" db = MysqlHelper() repeat_video = db.update(sql=insert_sql) if repeat_video: return True return False """ 查询用户id是否存在 """ def select_id(self, uid): sql = f""" select uid from xng_uid where uid = "{uid}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return True return False """ 查询用户id是否之前已添加过 """ def select_id_status(self, uid): sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """ db = MysqlHelper() repeat_video = db.select(sql=sql) if repeat_video: return False return True if __name__ == "__main__": XiaoNianGaoZH()