# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/10/12 import os import shutil import sys import time import ffmpeg from appium import webdriver from appium.webdriver.common.touch_action import TouchAction from appium.webdriver.webdriver import WebDriver from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from main.common import Common from main.publish import Publish from main.feishu_lib import Feishu class Recommend: i = 0 @classmethod def get_video_info_from_local(cls, video_path): probe = ffmpeg.probe(video_path) # print('video_path: {}'.format(video_path)) # format1 = probe['format'] # bit_rate = int(format1['bit_rate']) / 1000 # duration = format['duration'] # size = int(format1['size']) / 1024 / 1024 video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: print('No video stream found!') return width = int(video_stream['width']) height = int(video_stream['height']) # num_frames = int(video_stream['nb_frames']) # fps = int(video_stream['r_frame_rate'].split('/')[0]) / int(video_stream['r_frame_rate'].split('/')[1]) duration = float(video_stream['duration']) # print('width: {}'.format(width)) # print('height: {}'.format(height)) # print('num_frames: {}'.format(num_frames)) # print('bit_rate: {}k'.format(bit_rate)) # print('fps: {}'.format(fps)) # print('size: {}MB'.format(size)) # print('duration: {}'.format(duration)) return width, height, duration @classmethod def filter_words(cls, log_type): try: filter_words_sheet = Feishu.get_values_batch(log_type, 'zhiqingzongqun', 'i2kcSD') filter_words_list = [] for x in filter_words_sheet: for y in x: if y is None: pass else: filter_words_list.append(y) return filter_words_list except Exception as e: Common.logger(log_type).error('filter_words异常:{}', e) @classmethod def start_wechat(cls, log_type, env): try: Common.logger(log_type).info('启动微信') caps = { "platformName": "Android", # 手机操作系统 Android / iOS "deviceName": "Android", # 连接的设备名(模拟器或真机),安卓可以随便写 "platforVersion": "11", # 手机对应的系统版本(Android 11) "appPackage": "com.tencent.mm", # 被测APP的包名,乐活圈 Android "appActivity": ".ui.LauncherUI", # 启动的Activity名 "autoGrantPermissions": "true", # 让 appium 自动授权 base 权限, # 如果 noReset 为 True,则该条不生效(该参数为 Android 独有),对应的值为 True 或 False "unicodekeyboard": True, # 使用自带输入法,输入中文时填True "resetkeyboard": True, # 执行完程序恢复原来输入法 "noReset": True, # 不重置APP "printPageSourceOnFailure": True, # 找不到元素时,appium log 会完整记录当前页面的 pagesource "newCommandTimeout": 6000, # 初始等待时间 "automationName": "UiAutomator2", # 使用引擎,默认为 Appium, # 其中 Appium、UiAutomator2、Selendroid、Espresso 用于 Android,XCUITest 用于 iOS "showChromedriverLog": True, 'enableWebviewDetailsCollection': True, 'setWebContentsDebuggingEnabled': True, 'recreateChromeDriverSessions': True, # 'chromedriverExecutable': '/Users/wangkun/Downloads/chromedriver', 'chromedriverExecutable': '/Users/piaoquan/Downloads/chromedriver', "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, # "chromeOptions": {"androidProcess": "com.tencent.mm:tools"}, 'browserName': '' } driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) driver.implicitly_wait(20) # 向下滑动页面,展示出小程序选择面板 time.sleep(20) Common.logger(log_type).info('下滑,展示小程序选择面板') size = driver.get_window_size() driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) # 打开小程序"知青总群" time.sleep(5) Common.logger(log_type).info('打开小程序"知青天天看"') driver.find_elements(By.XPATH, '//*[@text="知青天天看"]')[-1].click() # 获取视频信息 time.sleep(1) cls.get_recommend(log_type, driver, env) # 退出微信 time.sleep(3) Common.logger(log_type).info('退出微信') cls.quit(log_type, driver) except Exception as e: Common.logger(log_type).error('start_wechat异常:{}\n', e) # 退出 APP @classmethod def quit(cls, log_type, driver: WebDriver): driver.quit() Common.logger(log_type).info('退出 APP 成功\n') # 切换 Handle @classmethod def switch_to_handle(cls, log_type, driver, env, text): try: windowHandles = driver.window_handles Common.logger(log_type).info('windowHandles:{}', windowHandles) # 遍历所有的handles,找到当前页面所在的handle:如果pageSource有包含你想要的元素,就是所要找的handle # 小程序的页面来回切换也需要:遍历所有的handles,切换到元素所在的handle for handle in windowHandles: Common.logger(log_type).info('切换到对应的windowHandle', handle) driver.switch_to.window(handle) time.sleep(3) if driver.page_source.find(text) != -1: break except Exception as e: Common.logger(log_type).warning('切换到小程序handle失败,重启APP:{}\n', e) cls.quit(log_type, driver) cls.start_wechat(log_type, env) @classmethod def get_recommend(cls, log_type, driver: WebDriver, env): try: driver.implicitly_wait(5) # 鼠标左键点击, 1为x坐标, 2为y坐标 time.sleep(10) Common.logger(log_type).info('关闭广告') size = driver.get_window_size() TouchAction(driver).tap(x=int(size['width'] * 0.5), y=int(size['height'] * 0.1)).perform() while True: try: # 切换到 webview webview = driver.contexts # Common.logger(log_type).info('webview:{}', webview) Common.logger(log_type).info('切换到小程序\n') # driver.switch_to.context('WEBVIEW_com.tencent.mm:appbrand0') driver.switch_to.context(webview[1]) time.sleep(5) cls.switch_to_handle(log_type, driver, env, '知青天天看') break except Exception as e: Common.logger(log_type).warning('切换到小程序失败,重启APP:{}\n', e) cls.quit(log_type, driver) cls.i = 0 cls.start_wechat(log_type, env) # Common.logger(log_type).info('点击"换"按钮') # try: # driver.find_element(By.XPATH, '//wx-view[@class="refreshbtnbox"]').click() # except NoSuchElementException: # Common.logger(log_type).warning('未点击到"换"按钮') time.sleep(5) Common.logger(log_type).info('获取推荐列表视频信息') while True: cls.i += 1 recommend_handles = driver.window_handles for recommend_handle in recommend_handles: try: driver.switch_to.window(recommend_handle) # ad try: ad = driver.find_element( By.XPATH, '//*[@class="videolistbox videolist--videolistbox"]' '/*[' + str(cls.i) + ']//*[@class="ad-_banner-_-full"]' ) except NoSuchElementException: ad = 0 # video_title try: title = driver.find_element( By.XPATH, '//*[@class="videolistbox videolist--videolistbox"]' '/*[' + str(cls.i) + ']//*[@class="video_title videolist--video_title"]') # 向上滚动至-元素可见 # Common.logger(log_type).info('滑动视频标题至屏幕中间') driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'})", title) video_title = title.get_attribute('innerHTML') except NoSuchElementException: title = 0 video_title = 0 # play_cnt try: play_cnt = driver.find_element( By.XPATH, '//*[@class="videolistbox videolist--videolistbox"]' '/*[' + str(cls.i) + ']//*[@class="clickbox videolist--clickbox"]')\ .get_attribute('innerHTML') except NoSuchElementException: play_cnt = 0 # cover_url try: cover_url = driver.find_element( By.XPATH, '//*[@class="videolistbox videolist--videolistbox"]' '/*[' + str(cls.i) + ']//*[@class="itemimage videolist--itemimage"]')\ .get_attribute('src') except Exception: # Common.logger(log_type).error('cover_url异常:{}', e) cover_url = 0 if ad != 0: Common.logger(log_type).info('正在获取第{}条:广告\n', cls.i) break elif video_title == 0: pass else: Common.logger(log_type).info('正在获取第{}条:{}', cls.i, video_title) if video_title == 0 or cover_url == 0: Common.logger(log_type).info('无效视频\n') elif '精美图文' in video_title: Common.logger(log_type).info('精美图文\n') elif any(word if word in video_title else False for word in cls.filter_words(log_type)) is True: Common.logger(log_type).info('视频已中过滤词:{}\n', video_title) # driver.press_keycode(4) elif video_title in [x for y in Feishu.get_values_batch( log_type, 'zhiqingzongqun', 'Z48hlq') for x in y]: Common.logger(log_type).info('视频已存在\n') # driver.press_keycode(4) elif video_title in [x for y in Feishu.get_values_batch( log_type, 'zhiqingzongqun', '1a88b3') for x in y]: Common.logger(log_type).info('视频已下载\n') # driver.press_keycode(4) else: # video_url video_url = cls.get_url(log_type, driver, video_title, title) Common.logger(log_type).info('play_cnt:{}', play_cnt) Common.logger(log_type).info('video_url:{}', video_url) # 下载视频 Common.download_method(log_type, 'video', video_title, video_url) # 获取视频时长 video_info = cls.get_video_info_from_local( "./videos/" + video_title + "/video.mp4") download_width = str(video_info[0]) download_height = str(video_info[1]) download_duration = video_info[2] # 视频时长<60s,直接删除 if int(download_duration) < 60: # 删除视频文件夹 shutil.rmtree("./videos/" + video_title + "/") Common.logger(log_type).info("时长:{}<60秒,删除成功\n", int(download_duration)) return else: # 下载封面 Common.download_method(log_type, 'cover', video_title, cover_url) # 保存视频信息至 "./videos/{download_video_title}/info.txt" with open("./videos/" + video_title + "/" + "info.txt", "a", encoding="UTF-8") as f_a: f_a.write(str(int(time.time())) + "\n" + str(video_title) + "\n" + str(int(download_duration)) + "\n" + str(int(float( play_cnt.split(' ')[-1].split('万')[0]) * 10000)) + "\n" + '0' + "\n" + '0' + "\n" + '0' + "\n" + str(download_width) + '*' + str(download_height) + "\n" + str(int(time.time())) + "\n" + '知青天天看' + "\n" + str(cover_url) + "\n" + str(video_url) + "\n" + str(cover_url) + "\n" + "zhiqingzongqun" + str(int(time.time()))) Common.logger(log_type).info("==========视频信息已保存至info.txt==========") # 上传视频 Common.logger(log_type).info("开始上传视频:{}".format(video_title)) if env == 'dev' and int(download_width) >= int(download_height): our_video_id = Publish.upload_and_publish(log_type, env, "width") our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/" + str( our_video_id) + "/info" elif env == 'dev' and int(download_width) < int(download_height): our_video_id = Publish.upload_and_publish(log_type, env, "height") our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/" + str( our_video_id) + "/info" elif env == 'prod' and int(download_width) >= int(download_height): our_video_id = Publish.upload_and_publish(log_type, env, "width") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str( our_video_id) + "/info" elif env == 'prod' and int(download_width) < int(download_height): our_video_id = Publish.upload_and_publish(log_type, env, "height") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str( our_video_id) + "/info" else: our_video_id = Publish.upload_and_publish(log_type, env, "width") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str( our_video_id) + "/info" Common.logger(log_type).info("视频上传完成:{}", video_title) # 保存视频 ID 到已下载表 Common.logger(log_type).info("保存视频至已下载表:{}", video_title) # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "zhiqingzongqun", "1a88b3", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "推荐榜", video_title, our_video_link, play_cnt, int(download_duration), str(download_width) + '*' + str(download_height), cover_url, video_url]] time.sleep(1) Feishu.update_values(log_type, "zhiqingzongqun", "1a88b3", "F2:V2", values) Common.logger(log_type).info("视频:{},下载/上传成功\n", video_title) driver.press_keycode(4) # Feishu.insert_columns(log_type, 'zhiqingzongqun', 'Z48hlq', 'ROWS', 1, 2) # get_feeds_time = int(time.time()) # values = [[time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(get_feeds_time)), # '推荐榜', # str(video_title), # play_cnt, # cover_url, # video_url]] # time.sleep(1) # Feishu.update_values(log_type, 'zhiqingzongqun', 'Z48hlq', 'A2:Z2', values) # Common.logger(log_type).info('视频信息写入飞书文档成功\n') # driver.press_keycode(4) # cls.download_publish(log_type, env) break except Exception: # Common.logger(log_type).error('switch_to.window(recommend_handle)异常:{}', e) pass if cls.i == 2000: cls.i = 0 break except Exception as e: Common.logger(log_type).error('get_recommend异常:{},重启 APP\n', e) cls.quit(log_type, driver) cls.i = 0 cls.start_wechat(log_type, env) @classmethod def get_url(cls, log_type, driver: WebDriver, video_title, title): try: Common.logger(log_type).info('进入视频详情:{}', video_title) title.click() time.sleep(5) Common.logger(log_type).info('关闭广告') size = driver.get_window_size() TouchAction(driver).tap(x=int(size['width'] * 0.5), y=int(size['height'] * 0.1)).perform() time.sleep(10) info_handles = driver.window_handles for info_handle in info_handles: try: driver.switch_to.window(info_handle) url = driver.find_element( By.XPATH, '//*[@class="wx-swiper-slide-frame"]' '/*[2]//*[@class="video_item videoswiper--video_item"]') video_url = url.get_attribute('src') return video_url except NoSuchElementException: pass except Exception as e: Common.logger(log_type).error('get_url异常:{}\n', e) if __name__ == '__main__': # Recommend.start_wechat('recommend', 'prod') print(Recommend.filter_words('recommend')) pass