# 众妙音信接入后台 # @Author: luojunhui # @Time: 2023/10/26 import json import os import sys import time import uuid from hashlib import md5 from appium import webdriver from appium.webdriver.common.touch_action import TouchAction from appium.webdriver.extensions.android.nativekey import AndroidKey from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.mq import MQ from common.aliyun_log import AliyunLogger from common.pipeline import PiaoQuanPipeline class ZMYXRecommend: def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = None self.platform = "zhongmiaoyinxin" self.download_cnt = 0 self.element_list = [] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid if self.env == "dev": chromedriverExecutable = "/Users/luojunhui/Downloads/chromedriver_V111/chromedriver" else: chromedriverExecutable = '/Users/luojunhui/Downloads/chromedriver_V111/chromedriver' # Mac 爬虫机器 # 微信的配置文件 caps = { "platformName": "Android", # 手机操作系统 Android / iOS "deviceName": "a0a65126", # 连接的设备名(模拟器或真机),安卓可以随便写 # "udid": "emulator-5554", # 指定 adb devices 中的哪一台设备 "platforVersion": "11", # 手机对应的系统版本 "appPackage": "com.tencent.mm", # 被测APP的包名,乐活圈 Android "appActivity": ".ui.LauncherUI", # 启动的Activity名 "autoGrantPermissions": "true", # 让 appium 自动授权 base 权限, # 如果 noReset 为 True,则该条不生效(该参数为 Android 独有),对应的值为 True 或 False "unicodekeyboard": True, # 使用自带输入法,输入中文时填True "resetkeyboard": True, # 执行完程序恢复原来输入法 "noReset": True, # 不重置APP "printPageSourceOnFailure": True, # 找不到元素时,appium log 会完整记录当前页面的 pagesource "newCommandTimeout": 6000, # 初始等待时间 "automationName": "UiAutomator2", # 使用引擎,默认为 Appium, # 其中 Appium、UiAutomator2、Selendroid、Espresso 用于 Android,XCUITest 用于 iOS "showChromedriverLog": True, 'enableWebviewDetailsCollection': True, 'setWebContentsDebuggingEnabled': True, 'recreateChromeDriverSessions': True, 'chromedriverExecutable': chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, # "chromeOptions": {"androidProcess": "com.tencent.mm:tools"}, 'browserName': '' } try: self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except: AliyunLogger.logging( code="3002", platform=self.platform, mode=self.log_type, env=self.env, message="appium 启动异常" ) return self.driver.implicitly_wait(30) wechat_flag = self.check_wechat() if wechat_flag: size = self.driver.get_window_size() self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) time.sleep(1) self.driver.find_elements(By.XPATH, '//*[@text="西瓜悦"]')[-1].click() AliyunLogger.logging( code="1000", platform=self.platform, env=self.env, mode=self.log_type, message="打开小程序西瓜悦成功" ) time.sleep(5) self.get_videoList() time.sleep(100) self.driver.quit() else: AliyunLogger.logging( code="3001", platform=self.platform, mode=self.log_type, env=self.env, message="打开微信异常" ) return def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass # 检查是否打开微信 def check_wechat(self): for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="启动微信成功" ) return True elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"): print("发现并关闭系统下拉菜单") AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="第{}次错误打开了通知栏".format(i + 1) ) self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click() else: pass except NoSuchElementException: time.sleep(10) return False def check_to_applet(self): while True: webview = self.driver.contexts self.driver.switch_to.context(webview[1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: video_list = self.driver.find_element(By.XPATH, '//*[@class="index--navbar-list"]/*[1]') video_list.click() print("切换 webview 成功") return except NoSuchElementException: time.sleep(1) print("切换 webview 失败") break def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.442), 200) self.swipe_count += 1 def close_ad(self): window_size = self.driver.get_window_size() TouchAction(self.driver).tap(x=int(window_size['width'] * 0.5), y=int(window_size['height'] * 0.1)).perform() def get_video_url(self, video_element): video_element.click() time.sleep(5) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: video_url_element = self.driver.find_element(By.XPATH, '//wx-video[@class="videoh"]') video_url = video_url_element.get_attribute("src") self.close_ad() return video_url except NoSuchElementException: time.sleep(1) def get_videoList(self): self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.driver.implicitly_wait(20) self.close_ad() AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="已经关闭广告" ) self.check_to_applet() AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="成功切换到 webview" ) index = 0 while True: if self.search_elements('//*[@id="scrollContainer"]') is None: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.log_type, env=self.env, message="窗口已销毁" ) return video_elements = self.search_elements('//wx-view[@class="cover"]') if video_elements is None: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.log_type, env=self.env, message="视频列表为空列表" ) return video_element_temp = video_elements[index:] if len(video_element_temp) == 0: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.log_type, env=self.env, message="视频已经到底" ) return for i, video_element in enumerate(video_element_temp): if video_element is None: return # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引 trace_id = self.crawler + str(uuid.uuid1()) AliyunLogger.logging( code="1001", platform=self.platform, mode=self.log_type, env=self.env, trace_id=trace_id, message="扫描到一条视频", ) self.download_cnt += 1 self.search_elements('//wx-view[@class="cover"]') time.sleep(3) self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})", video_element) video_title = video_element.find_elements(By.XPATH, '//wx-view[@class="playImgs"]')[index + i].text cover_url = video_element.find_elements(By.XPATH, '//wx-image[@class="coverImg"]')[ index + i].get_attribute('src') play_cnt = video_element.find_elements(By.XPATH, '//wx-image[@class="coverImg"]/span/*[2]')[ index + i].text if "万" in play_cnt: play_cnt = int(play_cnt.split("万")[0]) * 10000 out_video_id = md5(video_title.encode('utf8')).hexdigest() video_dict = { 'video_title': video_title, 'video_id': out_video_id, 'out_video_id': out_video_id, 'play_cnt': play_cnt, 'comment_cnt': 0, 'like_cnt': 0, 'share_cnt': 0, 'publish_time_stamp': int(time.time()), 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), 'update_time_stamp': int(time.time()), 'user_name': "zhongmiaoyinxin", 'user_id': "zhongmiaoyinxin", 'avatar_url': cover_url, 'cover_url': cover_url, 'session': f"zhongmiaoyinxin-{int(time.time())}" } pipeline = PiaoQuanPipeline( platform=self.crawler, mode=self.log_type, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id ) flag = pipeline.process_item() if flag: print(video_dict) video_url = self.get_video_url(video_element) if video_url is None: self.driver.press_keycode(AndroidKey.BACK) else: video_dict["video_url"] = video_url video_dict['strategy'] = self.log_type video_dict["out_user_id"] = "" video_dict["platform"] = self.crawler video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] self.mq.send_msg(video_dict) AliyunLogger.logging( code="1002", platform=self.platform, mode=self.log_type, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) self.driver.press_keycode(AndroidKey.BACK) def run(): rule_dict1 = {} ZMYXRecommend("recommend", "zhongmiaoyinxin", "prod", rule_dict1, 6267141) if __name__ == "__main__": run()