# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/10/26 import os import sys import time from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from selenium.common import NoSuchElementException from selenium.webdriver.common.by import By from xinshi.xinshi_publish import Publish sys.path.append(os.getcwd()) from main.common import Common from main.feishu_lib import Feishu class XinshiAPP: i = 0 @classmethod def start_wechat(cls, log_type, env): Common.logger(log_type).info('启动微信') caps = { "platformName": "Android", # 手机操作系统 Android / iOS "deviceName": "Android", # 连接的设备名(模拟器或真机),安卓可以随便写 "platforVersion": "11", # 手机对应的系统版本(Android 11) "appPackage": "com.tencent.mm", # 被测APP的包名,乐活圈 Android "appActivity": ".ui.LauncherUI", # 启动的Activity名 "autoGrantPermissions": True, # 让 appium 自动授权 base 权限, # 如果 noReset 为 True,则该条不生效(该参数为 Android 独有),对应的值为 True 或 False "unicodekeyboard": True, # 使用自带输入法,输入中文时填True "resetkeyboard": True, # 执行完程序恢复原来输入法 "noReset": True, # 不重置APP "recreateChromeDriverSessions": True, # 切换到非 chrome-Driver 会 kill 掉 session,就不需要手动 kill 了 "printPageSourceOnFailure": True, # 找不到元素时,appium log 会完整记录当前页面的 pagesource "newCommandTimeout": 6000, # 初始等待时间 "automationName": "UiAutomator2", # 使用引擎,默认为 Appium, # 其中 Appium、UiAutomator2、Selendroid、Espresso 用于 Android,XCUITest 用于 iOS "showChromedriverLog": True, # "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, "chromeOptions": {"androidProcess": "com.tencent.mm:tools"}, 'enableWebviewDetailsCollection': True, 'setWebContentsDebuggingEnabled': True, 'chromedriverExecutable': '/Users/wangkun/Downloads/chromedriver_v86/chromedriver', } driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) driver.implicitly_wait(10) time.sleep(5) Common.logger(log_type).info('开始抓取"新视-热门榜"') cls.search_video(log_type, driver, 'gO4Sn4', env) Common.logger(log_type).info('"新视-热门榜"抓取完毕\n') Common.logger(log_type).info('开始抓取"新视-推荐榜"') cls.search_video(log_type, driver, 'aOjaIU', env) Common.logger(log_type).info('"新视-推荐榜"抓取完毕\n') @classmethod def search_element(cls, log_type, driver: WebDriver, element): try: windowHandles = driver.window_handles # 遍历所有的handles,找到当前页面所在的handle:如果pageSource有包含你想要的元素,就是所要找的handle # 小程序的页面来回切换也需要:遍历所有的handles,切换到元素所在的handle for handle in windowHandles: driver.switch_to.window(handle) time.sleep(3) if len(driver.find_elements(By.XPATH, element)) != 0: return driver.find_element(By.XPATH, element) else: pass except Exception as e: Common.logger(log_type).warning('search_element异常:{}\n', e) @classmethod def search_video(cls, log_type, driver: WebDriver, sheetid, env): # try: sheet = Feishu.get_values_batch(log_type, 'shipinhao', sheetid) if len(sheet) == 1: Common.logger(log_type).info('暂无数据\n') return else: for i in range(1, len(sheet)+1): video_title = sheet[i][2] video_user = sheet[i][3] Common.logger(log_type).info('点击搜索按钮') driver.find_element(By.ID, 'com.tencent.mm:id/j5t').click() Common.logger(log_type).info('输入视频标题:{}', video_title) time.sleep(3) driver.find_element(By.ID, 'com.tencent.mm:id/cd7').clear().send_keys(video_title[:20]) Common.logger(log_type).info('点击搜索') driver.press_keycode(AndroidKey.SEARCH) driver.find_element(By.ID, 'com.tencent.mm:id/lm0').click() Common.logger(log_type).info('切换到webview') webview = driver.contexts driver.switch_to.context(webview[1]) time.sleep(5) video_list = cls.search_element(log_type, driver, '//div[@class="unit"]/*[2]') Common.logger(log_type).info('点击"视频"分类') video_list.click() time.sleep(5) # webview搜索并点击目标视频 while True: cls.i += 1 title_element = cls.search_element( log_type, driver, '//div[@class="double-rich double-rich_vertical"]' '/*['+str(cls.i)+']//div[@class="title ellipsis_2"]') driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'})", title_element) download_title = title_element.text download_user = cls.search_element( log_type, driver, '//div[@class="double-rich double-rich_vertical"]' '/*[' + str(cls.i) + ']//div[@class="vc-source"]').text.split('\n')[0] Common.logger(log_type).info('title:{}', download_title) Common.logger(log_type).info('user:{}\n', download_user) if video_title[:20] in download_title and video_user in download_user: Common.logger(log_type).info('点击进入该视频详情') title_element.click() break # 获取视频播放数据及播放地址 cls.get_video_info(log_type, driver, sheetid) # 下载/上传 cls.xinshi_download_publish(log_type, sheetid, env) # 恢复到微信首页 Common.logger(log_type).info('退出视频号') driver.press_keycode(AndroidKey.BACK) time.sleep(2) Common.logger(log_type).info('点击"取消"') driver.find_element(By.ID, 'com.tencent.mm:id/ki1').click() time.sleep(2) Common.logger(log_type).info('回到微信首页') driver.find_element(By.ID, 'com.tencent.mm:id/apy').click() # except Exception as e: # Common.logger(log_type).error('get_video_info_from_feishu异常:{}\n', e) @classmethod def get_video_info(cls, log_type, driver: WebDriver, sheetid): # webview = driver.contexts Common.logger(log_type).info('切回NATIVE_APP\n') driver.switch_to.context('NATIVE_APP') # 点击播放器,获取视频时长 # Common.logger(log_type).info('暂停播放') pause_btn = driver.find_element(By.ID, 'com.tencent.mm:id/eh4') pause_btn.click() start_time = driver.find_element(By.ID, 'com.tencent.mm:id/l59').get_attribute('name') start_time = int(start_time.split(':')[0]) * 60 + int(start_time.split(':')[-1]) try: end_time = driver.find_element(By.ID, 'com.tencent.mm:id/l7i').get_attribute('name') except NoSuchElementException: end_time = driver.find_element(By.ID, 'com.tencent.mm:id/g73').get_attribute('name') end_time = int(end_time.split(':')[0]) * 60 + int(end_time.split(':')[-1]) duration = start_time + end_time # 点赞 like_id = driver.find_element(By.ID, 'com.tencent.mm:id/k04') like_cnt = like_id.get_attribute('name') if like_cnt == "" or like_cnt == "喜欢": like_cnt = 0 elif '万' in like_cnt: like_cnt = float(like_cnt.split('万')[0]) * 10000 elif '万+' in like_cnt: like_cnt = float(like_cnt.split('万+')[0]) * 10000 else: like_cnt = float(like_cnt) # 分享 share_id = driver.find_element(By.ID, 'com.tencent.mm:id/jhv') share_cnt = share_id.get_attribute('name') if share_cnt == "" or share_cnt == "转发": share_cnt = 0 elif '万' in share_cnt: share_cnt = float(share_cnt.split('万')[0]) * 10000 elif '万+' in share_cnt: share_cnt = float(share_cnt.split('万+')[0]) * 10000 else: share_cnt = float(share_cnt) # 收藏 favorite_id = driver.find_element(By.ID, 'com.tencent.mm:id/fnp') favorite_cnt = favorite_id.get_attribute('name') if favorite_cnt == "" or favorite_cnt == "收藏": favorite_cnt = 0 elif '万' in favorite_cnt: favorite_cnt = float(favorite_cnt.split('万')[0]) * 10000 elif '万+' in favorite_cnt: favorite_cnt = float(favorite_cnt.split('万+')[0]) * 10000 else: favorite_cnt = float(favorite_cnt) # 评论 comment_id = driver.find_element(By.ID, 'com.tencent.mm:id/bje') comment_cnt = comment_id.get_attribute('name') if comment_cnt == "" or comment_cnt == "评论": comment_cnt = 0 elif '万' in comment_cnt: comment_cnt = float(comment_cnt.split('万')[0]) * 10000 elif '万+' in comment_cnt: comment_cnt = float(comment_cnt.split('万+')[0]) * 10000 else: comment_cnt = float(comment_cnt) # 把视频信息写入飞书feeds文档 values = [[duration, like_cnt, share_cnt, favorite_cnt, comment_cnt]] time.sleep(1) Feishu.update_values(log_type, 'shipinhao', sheetid, 'E2:I2', values) Common.logger(log_type).info('视频信息写入飞书文档成功\n') # 分享给 windows 爬虫机 share_id.click() driver.find_element(By.XPATH, '//*[@text="转发给朋友"]').click() driver.find_element(By.XPATH, '//*[@text="爬虫群"]').click() driver.find_element(By.ID, 'com.tencent.mm:id/guw').click() while True: if Feishu.get_values_batch(log_type, 'shipinhao', sheetid)[1][11] is None: Common.logger(log_type).info('等待更新 URL 信息') time.sleep(10) else: Common.logger(log_type).info('URL 信息已更新\n') break @classmethod def xinshi_download_publish(cls, log_type, sheetid, env): try: download_sheet = Feishu.get_values_batch(log_type, 'shipinhao', sheetid) for i in range(1, len(download_sheet)): download_title = download_sheet[i][2].strip().replace('"', '') \ .replace('“', '').replace('“', '…').replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") download_duration = download_sheet[i][3] download_like_cnt = download_sheet[i][4] download_share_cnt = download_sheet[i][5] download_favorite_cnt = download_sheet[i][6] download_comment_cnt = download_sheet[i][7] download_username = download_sheet[i][8] download_head_url = download_sheet[i][9] download_cover_url = download_sheet[i][10] download_video_url = download_sheet[i][11] Common.logger(log_type).info("download_title:{}", download_title) Common.logger(log_type).info("download_username:{}", download_username) Common.logger(log_type).info("download_video_url:{}", download_video_url) if download_title is None or download_duration is None or download_video_url is None: Feishu.dimension_range(log_type, 'shipinhao', sheetid, 'ROWS', i + 1, i + 1) Common.logger(log_type).info('空行,删除成功\n') return elif str(download_title) in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'c77cf9') for x in y]: Feishu.dimension_range(log_type, 'shipinhao', sheetid, 'ROWS', i + 1, i + 1) Common.logger(log_type).info('视频已下载,删除成功\n') return elif str(download_title) in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'WAG7Dq') for x in y]: Feishu.dimension_range(log_type, 'shipinhao', sheetid, 'ROWS', i + 1, i + 1) Common.logger(log_type).info('视频已下载,删除成功\n') return elif str(download_title) in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', '0i4jmV') for x in y]: Feishu.dimension_range(log_type, 'shipinhao', sheetid, 'ROWS', i + 1, i + 1) Common.logger(log_type).info('视频已下载,删除成功\n') return elif str(download_title) in [x for y in Feishu.get_values_batch(log_type, 'shipinhao', 'c77cf9') for x in y]: Feishu.dimension_range(log_type, 'shipinhao', sheetid, 'ROWS', i + 1, i + 1) Common.logger(log_type).info('视频已存在,删除成功\n') return else: # 下载封面 Common.download_method(log_type=log_type, text="cover", d_name=str(download_title), d_url=str(download_cover_url)) # 下载视频 Common.download_method(log_type=log_type, text="video", d_name=str(download_title), d_url=str(download_video_url)) # 保存视频信息至 "./videos/{download_video_title}/info.txt" with open("./videos/" + download_title + "/" + "info.txt", "a", encoding="UTF-8") as f_a: f_a.write('shipinhao' + str(int(time.time())) + "\n" + str(download_title) + "\n" + str(download_duration) + "\n" + str(download_favorite_cnt) + "\n" + str(download_comment_cnt) + "\n" + str(download_like_cnt) + "\n" + str(download_share_cnt) + "\n" + str(1920 * 1080) + "\n" + str(int(time.time())) + "\n" + str(download_username) + "\n" + str(download_head_url) + "\n" + str(download_video_url) + "\n" + str(download_cover_url) + "\n" + "shipinhao") Common.logger(log_type).info("==========视频信息已保存至info.txt==========") Common.logger(log_type).info("开始上传视频:{}".format(download_title)) if env == 'dev' and sheetid == 'gO4Sn4': our_video_id = Publish.upload_and_publish(log_type, env, "xinshi_hot") our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/"+str(our_video_id)+"/info" # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "shipinhao", '0i4jmV', "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "新视-热门榜", str(download_title), our_video_link, download_duration, download_like_cnt, download_share_cnt, download_favorite_cnt, download_comment_cnt, download_username, str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "shipinhao", '0i4jmV', "F2:V2", values) # 删除行或列,可选 ROWS、COLUMNS time.sleep(1) Feishu.dimension_range(log_type, "shipinhao", sheetid, "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频上传完成:{}\n", our_video_link) return elif env == 'dev' and sheetid == 'aOjaIU': our_video_id = Publish.upload_and_publish(log_type, env, "xinshi_recommend") our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/"+str(our_video_id)+"/info" # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "shipinhao", 'WAG7Dq', "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "新视-推荐榜", str(download_title), our_video_link, download_duration, download_like_cnt, download_share_cnt, download_favorite_cnt, download_comment_cnt, download_username, str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "shipinhao", 'WAG7Dq', "F2:V2", values) # 删除行或列,可选 ROWS、COLUMNS time.sleep(1) Feishu.dimension_range(log_type, "shipinhao", sheetid, "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频上传完成:{}\n", our_video_link) return elif env == 'prod' and sheetid == 'gO4Sn4': our_video_id = Publish.upload_and_publish(log_type, env, "xinshi_hot") our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/"+str(our_video_id)+"/info" # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "shipinhao", '0i4jmV', "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "新视-热门榜", str(download_title), our_video_link, download_duration, download_like_cnt, download_share_cnt, download_favorite_cnt, download_comment_cnt, download_username, str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "shipinhao", '0i4jmV', "F2:V2", values) # 删除行或列,可选 ROWS、COLUMNS time.sleep(1) Feishu.dimension_range(log_type, "shipinhao", sheetid, "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频上传完成:{}\n", our_video_link) return elif env == 'prod' and sheetid == 'aOjaIU': our_video_id = Publish.upload_and_publish(log_type, env, "xinshi_recommend") our_video_link = "https://testadmin.piaoquantv.com/cms/post-detail/"+str(our_video_id)+"/info" # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "shipinhao", 'WAG7Dq', "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "新视-热门榜", str(download_title), our_video_link, download_duration, download_like_cnt, download_share_cnt, download_favorite_cnt, download_comment_cnt, download_username, str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "shipinhao", 'WAG7Dq', "F2:V2", values) # 删除行或列,可选 ROWS、COLUMNS time.sleep(1) Feishu.dimension_range(log_type, "shipinhao", sheetid, "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频上传完成:{}\n", our_video_link) return else: our_video_id = Publish.upload_and_publish(log_type, env, "xinshi_hot") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/"+str(our_video_id)+"/info" # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "shipinhao", 'WAG7Dq', "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "新视-热门榜", str(download_title), our_video_link, download_duration, download_like_cnt, download_share_cnt, download_favorite_cnt, download_comment_cnt, download_username, str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "shipinhao", 'WAG7Dq', "F2:V2", values) # 删除行或列,可选 ROWS、COLUMNS time.sleep(1) Feishu.dimension_range(log_type, "shipinhao", sheetid, "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频上传完成:{}\n", our_video_link) return except Exception as e: Feishu.dimension_range(log_type, "shipinhao", "FSDlBy", "ROWS", 2, 2) Common.logger(log_type).error('download_publish异常,删除视频信息成功:{}\n', e) if __name__ == '__main__': XinshiAPP.start_wechat('xinshi-app', 'dev') pass