# -*- coding: utf-8 -*- import json import multiprocessing import os import random import subprocess import sys import time import uuid from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.common.touch_action import TouchAction from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from application.functions import get_redirect_url from application.pipeline import PiaoQuanPipelineTest, PiaoQuanPipeline from application.common.log import AliyunLogger, Local from application.common import MysqlHelper, Feishu class ZFHYBFRecommend(object): """ 北京银时光技术有限公司 - 祝福好运暴富线下爬虫 """ def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = None self.platform = "zhufuhaoyunbaofu" self.download_cnt = 0 self.element_list = [] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid chromedriverExecutable = "/Users/tzld/Downloads/chromedriver_mac64/chromedriver" self.aliyun_log = AliyunLogger(platform=crawler, mode=log_type, env=env) print("启动微信") # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": True, "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except Exception as e: print(e) self.aliyun_log.logging( code="3002", message=f'appium 启动异常: {e}' ) return self.driver.implicitly_wait(30) for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): Local.logger(self.log_type, self.crawler).info("微信启动成功") self.aliyun_log.logging( code="1000", message="启动微信成功" ) break elif self.driver.find_element( By.ID, "com.android.systemui:id/dismiss_view" ): Local.logger(self.log_type, self.crawler).info("发现并关闭系统下拉菜单") # Common.logging(self.log_type, self.crawler, self.env, '发现并关闭系统下拉菜单') self.aliyun_log.logging( code="1000", message="发现并关闭系统下拉菜单" ) size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.2), 200, ) else: pass except Exception as e: self.aliyun_log.logging( code="3001", message="打开微信异常" ) time.sleep(1) Local.logger(self.log_type, self.crawler).info("下滑,展示小程序选择面板") size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.2), int(size["width"] * 0.5), int(size["height"] * 0.8), 200, ) command = 'adb shell service call statusbar 2' process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) process.communicate() time.sleep(1) self.driver.find_elements(By.XPATH, '//*[@text="祝福好运暴富"]')[-1].click() self.aliyun_log.logging( code="1000", message="打开小程序 祝福好运暴富 成功" ) time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) print("切换到WebView成功\n") return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="dynamic--album"]') size = self.driver.get_window_size() action = TouchAction(self.driver) action.press(x=int(size["width"] * 0.2), y=int(size["height"] * 0.8)) action.wait(ms=200) # 可以调整等待时间 action.move_to(x=int(size["width"] * 0.2), y=int(size["height"] * 0.8)) action.release() action.perform() self.swipe_count += 1 def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="dynamic--title"]') time.sleep(1) self.driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0], ) time.sleep(3) video_title_element[0].click() self.check_to_applet( xpath=r'//wx-video[@class="index--video-item index--video"]' ) time.sleep(10) video_url_elements = self.search_elements( '//wx-video[@class="index--video-item index--video"]' ) return video_url_elements[0].get_attribute("src") def parse_detail(self, index): self.check_to_applet(xpath='//*[@class="expose--adapt-parent"]') page_source = self.driver.page_source soup = BeautifulSoup(page_source, "html.parser") soup.prettify() video_list = soup.findAll( name="wx-view", attrs={"class": "expose--adapt-parent"} ) element_list = [i for i in video_list][index:] return element_list[0] def get_video_info_2(self, video_element): self.count += 1 # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引 trace_id = self.crawler + str(uuid.uuid1()) self.aliyun_log.logging( code="1001", trace_id=trace_id, message="扫描到一条视频", ) # 标题 video_title = video_element.find("wx-view", class_="dynamic--title").text # 播放量字符串 play_str = video_element.find("wx-view", class_="dynamic--views").text user_name = video_element.find("wx-view", class_="dynamic--nick-top").text # 头像 URL avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] # 封面 URL cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"] play_cnt = int(play_str.replace("\n", "")) if play_cnt < 10000: return out_video_id = md5(video_title.encode("utf8")).hexdigest() out_user_id = md5(user_name.encode("utf8")).hexdigest() video_dict = { "video_title": video_title, "video_id": out_video_id, "out_video_id": out_video_id, "duration_str": 0, "duration": 0, "play_str": play_str, "play_cnt": play_cnt, "like_str": 0, "like_cnt": 0, "comment_cnt": 0, "share_cnt": 0, "user_name": user_name, "user_id": out_user_id, "publish_time_stamp": int(time.time()), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), "update_time_stamp": int(time.time()), "avatar_url": avatar_url, "cover_url": cover_url, "session": f"zhufuhaoyunbaofu-{int(time.time())}", } pipeline = PiaoQuanPipeline( platform=self.crawler, mode=self.log_type, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) flag = pipeline.process_item() if flag: video_title_element = self.search_elements( f'//*[contains(text(), "{video_title}")]' ) if video_title_element is None: return Local.logger(self.log_type, self.crawler).info("点击标题,进入视频详情页") self.aliyun_log.logging( code="1000", message="点击标题,进入视频详情页", ) video_url = self.get_video_url(video_title_element) video_url = get_redirect_url(video_url) if video_url is None: self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) return video_dict["video_url"] = video_url video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] values = [[ video_dict["video_id"], time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), video_title, cover_url, video_url, play_cnt ]] Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "FPJe7M", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values('xiaoniangao', 'xiaoniangao', "FPJe7M", "A2:Z2", values) self.download_cnt += 1 self.mq.send_msg(video_dict) self.aliyun_log.logging( code="1002", message="成功发送至ETL", data=video_dict ) self.download_cnt += 1 self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: self.driver.press_keycode(AndroidKey.BACK) Local.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") self.aliyun_log.logging( code="3001", message=f"抓取单条视频异常:{e}\n" ) def get_videoList(self): self.driver.press_keycode(AndroidKey.BACK) time.sleep(40) # 关闭广告 x = 993 y = 72 self.driver.tap([(x, y)]) """ 获取视频列表 :return: """ self.driver.implicitly_wait(20) # 切换到 web_view self.check_to_applet(xpath='//*[@class="expose--adapt-parent"]') print("切换到 webview 成功") time.sleep(1) if self.search_elements('//*[@class="expose--adapt-parent"]') is None: self.aliyun_log.logging( code="3000", message="窗口已销毁" ) self.count = 0 self.download_cnt = 0 self.element_list = [] return print("开始获取视频信息") for i in range(50): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) if i >= 3 and (i - 3) % 4 == 0: self.swipe_up() time.sleep(random.randint(1, 5)) Local.logger(self.log_type, self.crawler).info("已抓取完一组,休眠 5 秒\n") self.aliyun_log.logging( code="1000", message="已抓取完一组,休眠 5 秒\n", ) time.sleep(5) if __name__ == "__main__": rule_dict1 = {"period": {"min": 0, "max": 365}, "duration": {"min": 0, "max": 1800}, "favorite_cnt": {"min": 0, "max": 0}, "videos_cnt": {"min": 0, "max": 0}, "share_cnt": {"min": 0, "max": 0}} ZFHYBFRecommend("recommend", "zhufuhaoyunbaofu", "prod", rule_dict1, [64120158, 64120157, 63676778])