# 众妙音信接入后台 # @Author: luojunhui # @Time: 2023/10/26 import json import os import sys import time from hashlib import md5 from appium import webdriver from appium.webdriver.common.touch_action import TouchAction from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By import multiprocessing sys.path.append(os.getcwd()) from common.mq import MQ from common.aliyun_log import AliyunLogger from common.pipeline import PiaoQuanPipeline class ZMYXRecommend: env = None driver = None log_type = None def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = None self.platform = "众妙音信" self.download_cnt = 0 self.element_list = [] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid if self.env == "dev": chromedriverExecutable = "/Users/luojunhui/Downloads/chromedriver_V111/chromedriver" else: chromedriverExecutable = "/Users/luojunhui/Downloads/chromedriver_v111/chromedriver" # 微信的配置文件 caps = { "platformName": "Android", # 手机操作系统 Android / iOS "deviceName": "a0a65126", # 连接的设备名(模拟器或真机),安卓可以随便写 # "udid": "emulator-5554", # 指定 adb devices 中的哪一台设备 "platforVersion": "11", # 手机对应的系统版本 "appPackage": "com.tencent.mm", # 被测APP的包名,乐活圈 Android "appActivity": ".ui.LauncherUI", # 启动的Activity名 "autoGrantPermissions": "true", # 让 appium 自动授权 base 权限, # 如果 noReset 为 True,则该条不生效(该参数为 Android 独有),对应的值为 True 或 False "unicodekeyboard": True, # 使用自带输入法,输入中文时填True "resetkeyboard": True, # 执行完程序恢复原来输入法 "noReset": True, # 不重置APP "printPageSourceOnFailure": True, # 找不到元素时,appium log 会完整记录当前页面的 pagesource "newCommandTimeout": 6000, # 初始等待时间 "automationName": "UiAutomator2", # 使用引擎,默认为 Appium, # 其中 Appium、UiAutomator2、Selendroid、Espresso 用于 Android,XCUITest 用于 iOS "showChromedriverLog": True, 'enableWebviewDetailsCollection': True, 'setWebContentsDebuggingEnabled': True, 'recreateChromeDriverSessions': True, 'chromedriverExecutable': chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, # "chromeOptions": {"androidProcess": "com.tencent.mm:tools"}, 'browserName': '' } self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) self.driver.implicitly_wait(30) for i in range(120): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): print("启动微信成功") break elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"): print("发现并关闭系统下拉菜单") self.driver.find_element(By.ID, "com.android.system:id/dismiss_view").click() else: pass except NoSuchElementException: time.sleep(1) size = self.driver.get_window_size() self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) time.sleep(1) self.driver.find_elements(By.XPATH, '//*[@text="西瓜悦"]')[-1].click() print("打开小程序成功") time.sleep(5) self.get_videoList() time.sleep(100) self.driver.quit() def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self): while True: webview = self.driver.contexts self.driver.switch_to.context(webview[1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: video_list = self.driver.find_element(By.XPATH, '//*[@class="index--navbar-list"]/*[1]') video_list.click() print("切换 webview 成功") return except NoSuchElementException: time.sleep(1) print("切换 webview 失败") break def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.442), 200) self.swipe_count += 1 def close_ad(self): window_size = self.driver.get_window_size() TouchAction(self.driver).tap(x=int(window_size['width'] * 0.5), y=int(window_size['height'] * 0.1)).perform() def get_videoList(self): self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.driver.implicitly_wait(20) print("关闭广告") self.close_ad() print("切换到 webview") self.check_to_applet() index = 0 while True: if self.search_elements('//*[@id="scrollContainer"]') is None: print("窗口已销毁") return print("获取视频列表") video_elements = self.search_elements('//wx-view[@class="cover"]') if video_elements is None: print("视频列表为空列表") return video_element_temp = video_elements[index:] if len(video_element_temp) == 0: print("视频已经到底") return for i, video_element in enumerate(video_element_temp): if video_element is None: return self.download_cnt += 1 self.search_elements('//wx-view[@class="cover"]') time.sleep(3) self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})", video_element) video_title = video_element.find_elements(By.XPATH, '//wx-view[@class="playImgs"]')[index + i].text cover_url = video_element.find_elements(By.XPATH, '//wx-image[@class="coverImg"]')[ index + i].get_attribute('src') play_cnt = video_element.find_elements(By.XPATH, '//wx-image[@class="coverImg"]/span/*[2]')[ index + i].text if "万" in play_cnt: play_cnt = int(play_cnt.split("万")[0]) * 10000 out_video_id = md5(video_title.encode('utf8')).hexdigest() video_dict = { 'video_title': video_title, 'video_id': out_video_id, 'out_video_id': out_video_id, 'play_cnt': play_cnt, 'comment_cnt': 0, 'like_cnt': 0, 'share_cnt': 0, 'publish_time_stamp': int(time.time()), 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), 'update_time_stamp': int(time.time()), 'user_name': "zhongmiaoyinxin", 'user_id': "zhongmiaoyinxin", 'avatar_url': cover_url, 'cover_url': cover_url, 'session': f"zhongmiaoyinxin-{int(time.time())}" } pipeline = PiaoQuanPipeline( platform=self.crawler, mode=self.log_type, item=video_dict, rule_dict=self.rule_dict, env=self.env ) flag = pipeline.process_item() if flag: print(video_dict) else: print("被规则过滤") # self.mq.send_msg(video_dict) # print(video_dict) def run(): rule_dict1 = {} ZMYXRecommend("recommend", "zhongmiaoyinxin", "dev", rule_dict1, 6267141) if __name__ == "__main__": run() # process = multiprocessing.Process( # target=run # ) # process.start() # while True: # if not process.is_alive(): # print("正在重启") # process.terminate() # time.sleep(60) # os.system("adb forward --remove-all") # process = multiprocessing.Process(target=run) # process.start() # time.sleep(60)