# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/4/25 import difflib import os import sys import time from datetime import date, timedelta from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from selenium.common import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.feishu import Feishu from common.publish import Publish from common.common import Common from common.public import get_config_from_mysql class ShipinhaoSearch: i = 0 @classmethod def start_wechat(cls, log_type, crawler, word, env): Common.logger(log_type, crawler).info('启动微信') if env == "dev": chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v107/chromedriver" else: chromedriverExecutable = '/Users/piaoquan/Downloads/chromedriver' caps = { "platformName": "Android", # 手机操作系统 Android / iOS "deviceName": "Android", # 连接的设备名(模拟器或真机),安卓可以随便写 "platforVersion": "13", # 手机对应的系统版本(Android 13) "appPackage": "com.tencent.mm", # 被测APP的包名,乐活圈 Android "appActivity": ".ui.LauncherUI", # 启动的Activity名 "autoGrantPermissions": True, # 让 appium 自动授权 base 权限, # 如果 noReset 为 True,则该条不生效(该参数为 Android 独有),对应的值为 True 或 False "unicodekeyboard": True, # 使用自带输入法,输入中文时填True "resetkeyboard": True, # 执行完程序恢复原来输入法 "noReset": True, # 不重置APP "recreateChromeDriverSessions": True, # 切换到非 chrome-Driver 会 kill 掉 session,就不需要手动 kill 了 "printPageSourceOnFailure": True, # 找不到元素时,appium log 会完整记录当前页面的 pagesource "newCommandTimeout": 6000, # 初始等待时间 "automationName": "UiAutomator2", # 使用引擎,默认为 Appium, # 其中 Appium、UiAutomator2、Selendroid、Espresso 用于 Android,XCUITest 用于 iOS "showChromedriverLog": True, # "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, "chromeOptions": {"androidProcess": "com.tencent.mm:tools"}, 'enableWebviewDetailsCollection': True, 'setWebContentsDebuggingEnabled': True, 'chromedriverExecutable': chromedriverExecutable, } driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) driver.implicitly_wait(10) # # 向下滑动页面,展示出小程序选择面板 # for i in range(120): # try: # # 发现微信消息 TAB,代表微信已启动成功 # if driver.find_elements(By.ID, 'com.tencent.mm:id/f2s'): # break # # 发现并关闭系统菜单栏 # elif driver.find_element(By.ID, 'com.android.systemui:id/dismiss_view'): # Common.logger(log_type, crawler).info('发现并关闭系统下拉菜单栏') # driver.find_element(By.ID, 'com.android.systemui:id/dismiss_view').click() # else: # pass # except NoSuchElementException: # time.sleep(1) if len(driver.find_elements(By.ID, 'android:id/text1')) != 0: driver.find_elements(By.ID, 'android:id/text1')[0].click() time.sleep(5) cls.search_video(log_type=log_type, crawler=crawler, word=word, driver=driver, env=env) cls.close_wechat(log_type=log_type, crawler=crawler, driver=driver) @classmethod def close_wechat(cls, log_type, crawler, driver: WebDriver): driver.quit() Common.logger(log_type, crawler).info(f"微信退出成功\n") @classmethod def is_contain_chinese(cls, strword): for ch in strword: if u'\u4e00' <= ch <= u'\u9fff': return True return False # 查找元素 @classmethod def search_elements(cls, driver: WebDriver, xpath): time.sleep(1) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: elements = driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass @classmethod def check_to_webview(cls, log_type, crawler, driver: WebDriver): # Common.logger(log_type, crawler).info('切换到webview') webviews = driver.contexts driver.switch_to.context(webviews[1]) time.sleep(1) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) try: shipinhao_webview = driver.find_element(By.XPATH, '//div[@class="unit"]') if shipinhao_webview: Common.logger(log_type, crawler).info('切换到视频号 webview 成功') return "成功" except Exception as e: Common.logger(log_type, crawler).info(f"{e}\n") @classmethod def search_video(cls, log_type, crawler, word, driver: WebDriver, env): # 点击微信搜索框,并输入搜索词 driver.implicitly_wait(10) driver.find_element(By.ID, 'com.tencent.mm:id/j5t').click() time.sleep(0.5) Common.logger(log_type, crawler).info(f'输入搜索词:{word}') driver.find_element(By.ID, 'com.tencent.mm:id/cd7').clear().send_keys(word) driver.press_keycode(AndroidKey.ENTER) # driver.find_elements(By.ID, 'com.tencent.mm:id/oi4')[0].click() driver.find_element(By.ID, 'com.tencent.mm:id/m94').click() time.sleep(5) # 切换到微信搜索结果页 webview check_to_webview = cls.check_to_webview(log_type, crawler, driver) if check_to_webview is None: Common.logger(log_type, crawler).info("切换到视频号 webview 失败\n") return time.sleep(1) # 切换到"视频号"分类 shipinhao_tags = cls.search_elements(driver, '//div[@class="unit"]/*[2]') Common.logger(log_type, crawler).info('点击"视频号"分类') shipinhao_tags[0].click() time.sleep(5) index = 0 while True: if cls.i >= 100: Common.logger(log_type, crawler).info(f'搜索词:"{word}",已抓取视频数:{index}') cls.i = 0 return # try: if cls.search_elements(driver, '//*[@class="double-rich double-rich_vertical"]') is None: Common.logger(log_type, crawler).info('窗口已销毁\n') return Common.logger(log_type, crawler).info('获取视频列表\n') video_elements = cls.search_elements(driver, '//div[@class="vc active__mask"]') if video_elements is None: Common.logger(log_type, crawler).warning(f'video_elements:{video_elements}') return video_element_temp = video_elements[index:] if len(video_element_temp) == 0: Common.logger(log_type, crawler).info('到底啦~~~~~~~~~~~~~\n') return for i, video_element in enumerate(video_element_temp): if video_element is None: Common.logger(log_type, crawler).info('到底啦~\n') return cls.i += 1 cls.search_elements(driver, '//div[@class="vc active__mask"]') Common.logger(log_type, crawler).info(f'拖动"视频"列表第{cls.i}个至屏幕中间') time.sleep(3) driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})", video_element) if len(video_element.find_elements(By.XPATH, "//*[@text='没有更多的搜索结果']")) != 0: Common.logger(log_type, crawler).info("没有更多的搜索结果\n") return video_title = video_element.find_elements(By.XPATH, '//div[@class="title ellipsis_2"]/*[2]')[index + i].text video_url = video_element.find_elements(By.XPATH, '//div[@class="video-player"]')[index+i].get_attribute('src') cover_url = video_element.find_elements(By.XPATH, '//div[@class="video-player__bd"]')[index+i].get_attribute('style') duration = video_element.find_elements(By.XPATH, '//div[@class="play-mask__text"]/*[2]')[index+i].text duration = int(duration.split(':')[0]) * 60 + int(duration.split(':')[-1]) user_name = video_element.find_elements(By.XPATH, '//p[@class="vc-source__text"]')[index+i].text avatar_url = video_element.find_elements(By.XPATH, '//div[@class="ui-image-image ui-image vc-source__thumb"]')[index+i].get_attribute('style') # Common.logger(log_type, crawler).info(f"video_title:{video_title}") # Common.logger(log_type, crawler).info(f"duration:{duration}") video_element.click() time.sleep(3) video_dict = cls.get_video_info(log_type=log_type, crawler=crawler, driver=driver) video_dict["video_title"] = video_title video_dict["duration"] = duration video_dict["video_url"] = video_url for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") if video_title in [x for y in Feishu.get_values_batch(log_type, crawler, "xYWCzf") for x in y]: Common.logger(log_type, crawler).info("视频已存在\n") else: cls.download_publish(log_type, crawler, word, video_dict) Common.logger(log_type, crawler).info('已抓取完一组视频,休眠1秒\n') time.sleep(1) index = index + len(video_element_temp) # except Exception as e: # Common.logger(log_type, crawler).info(f"get_videoList:{e}\n") # cls.i = 0 @classmethod def download_publish(cls, log_type, crawler, word, video_dict): Feishu.insert_columns(log_type, crawler, "xYWCzf", "ROWS", 1, 2) time.sleep(0.5) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), "视频号搜索", word, video_dict["video_title"], video_dict["duration"], video_dict["like_cnt"], video_dict["share_cnt"], video_dict["favorite_cnt"], video_dict["comment_cnt"], video_dict["publish_time_str"], "待获取", "待获取", "待获取", video_dict["video_url"]]] Feishu.update_values(log_type, crawler, "xYWCzf", "F2:Z2", values) Common.logger(log_type, crawler).info("写入飞书成功\n") @classmethod def get_video_info(cls, log_type, crawler, driver: WebDriver): # Common.logger(log_type, crawler).info('切回NATIVE_APP') driver.switch_to.context('NATIVE_APP') # 点赞 like_id = driver.find_element(By.ID, 'com.tencent.mm:id/k04') like_cnt = like_id.get_attribute('name') if like_cnt == "" or like_cnt == "喜欢" or like_cnt == "火" or cls.is_contain_chinese(like_cnt) is True: like_cnt = 0 elif '万' in like_cnt: like_cnt = int(float(like_cnt.split('万')[0]) * 10000) elif '万+' in like_cnt: like_cnt = int(float(like_cnt.split('万+')[0]) * 10000) else: like_cnt = int(float(like_cnt)) # 分享 share_id = driver.find_element(By.ID, 'com.tencent.mm:id/jhv') share_cnt = share_id.get_attribute('name') if share_cnt == "" or share_cnt == "转发" or cls.is_contain_chinese(share_cnt) is True: share_cnt = 0 elif '万' in share_cnt: share_cnt = int(float(share_cnt.split('万')[0]) * 10000) elif '万+' in share_cnt: share_cnt = int(float(share_cnt.split('万+')[0]) * 10000) else: share_cnt = int(float(share_cnt)) # 收藏 favorite_id = driver.find_element(By.ID, 'com.tencent.mm:id/fnp') favorite_cnt = favorite_id.get_attribute('name') if favorite_cnt == "" or favorite_cnt == "收藏" or favorite_cnt == "推荐" or favorite_cnt == "火" or cls.is_contain_chinese(favorite_cnt) is True: favorite_cnt = 0 elif '万' in favorite_cnt: favorite_cnt = int(float(favorite_cnt.split('万')[0]) * 10000) elif '万+' in favorite_cnt: favorite_cnt = int(float(favorite_cnt.split('万+')[0]) * 10000) else: favorite_cnt = int(float(favorite_cnt)) # 评论 comment_id = driver.find_element(By.ID, 'com.tencent.mm:id/bje') comment_cnt = comment_id.get_attribute('name') if comment_cnt == "" or comment_cnt == "评论" or cls.is_contain_chinese(comment_cnt) is True: comment_cnt = 0 elif '万' in comment_cnt: comment_cnt = int(float(comment_cnt.split('万')[0]) * 10000) elif '万+' in comment_cnt: comment_cnt = int(float(comment_cnt.split('万+')[0]) * 10000) else: comment_cnt = int(float(comment_cnt)) # 发布时间 comment_id.click() time.sleep(1) publish_time = driver.find_element(By.ID, "com.tencent.mm:id/bre").get_attribute("name") if "天前" in publish_time: days = int(publish_time.replace("天前", "")) publish_time_str = (date.today() + timedelta(days=-days)).strftime("%Y-%m-%d") elif "年" in publish_time: publish_time_str = publish_time.replace("年", "-").replace("月", "-").replace("日", "") else: publish_time_str = f'2023-{publish_time.replace("月", "-").replace("日", "")}' # publish_time_stamp = int(time.mktime(time.strptime(publish_time_str, "%Y-%m-%d"))) # 收起评论 # Common.logger(log_type, crawler).info("收起评论") driver.find_element(By.ID, "com.tencent.mm:id/be_").click() time.sleep(0.5) # 返回 webview # Common.logger(log_type, crawler).info(f"操作手机返回按键") driver.find_element(By.ID, "com.tencent.mm:id/a2z").click() time.sleep(0.5) # driver.press_keycode(AndroidKey.BACK) # cls.check_to_webview(log_type=log_type, crawler=crawler, driver=driver) webviews = driver.contexts driver.switch_to.context(webviews[1]) video_dict = { "like_cnt": like_cnt, "share_cnt": share_cnt, "favorite_cnt": favorite_cnt, "comment_cnt": comment_cnt, "publish_time_str": publish_time_str, # "publish_time_stamp": publish_time_stamp, } return video_dict @classmethod def search_all_videos(cls, log_type, crawler, env): word_list = get_config_from_mysql(log_type, crawler, env, "search_word", action="") for word in word_list: cls.i = 0 Common.logger(log_type, crawler).info(f"开始抓取搜索词:{word}") try: cls.start_wechat(log_type=log_type, crawler=crawler, word=word, env=env) except Exception as e: Common.logger(log_type, crawler).error(f"search_video:{e}\n") if __name__ == '__main__': ShipinhaoSearch.search_all_videos(log_type="search", crawler="shipinhao", env="dev") pass