# -*- coding: utf-8 -*- # @Time: 2023/11/17 import json import os import random import sys import time import uuid from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from bs4 import BeautifulSoup from selenium.common import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common import AliyunLogger, PiaoQuanPipeline, get_redirect_url from common.common import Common from common.mq import MQ from common.scheduling_db import MysqlHelper class ZFQZRecommend: platform = "祝福圈子" download_cnt = 0 element_list = [] i = 0 @classmethod def start_wechat(cls, log_type, crawler, env, rule_dict, our_uid): if env == "dev": chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" else: chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" Common.logger(log_type, crawler).info("启动微信") # Common.logging(log_type, crawler, env, '启动微信') caps = { "platformName": "Android", "devicesName": "Android", # "platformVersion": "11", # "udid": "emulator-5554", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": "true", "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except Exception as e: AliyunLogger.logging( code="3002", platform=ZFQZRecommend.platform, mode=log_type, env=env, message="appium 启动异常, 报错原因是{}".format(e) ) return driver.implicitly_wait(30) for i in range(120): try: if driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): Common.logger(log_type, crawler).info("微信启动成功") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message="启动微信成功" ) break elif driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"): Common.logger(log_type, crawler).info("发现并关闭系统下拉菜单") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message="发现并关闭系统下拉菜单" ) size = driver.get_window_size() driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.8), int(size['width'] * 0.5), int(size['height'] * 0.2), 200) # driver.find_element(By.ID, "com.android.system:id/dismiss_view").click() else: pass except NoSuchElementException: AliyunLogger.logging( code="3001", platform=ZFQZRecommend.platform, mode=log_type, env=env, message="打开微信异常" ) time.sleep(1) Common.logger(log_type, crawler).info("下滑,展示小程序选择面板") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message="下滑,展示小程序选择面板" ) size = driver.get_window_size() driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) time.sleep(1) Common.logger(log_type, crawler).info('打开小程序"祝福圈子"') driver.find_elements(By.XPATH, '//*[@text="祝福圈子"]')[-1].click() AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message='打开小程序"祝福圈子"成功' ) time.sleep(5) cls.get_videoList(log_type, crawler, driver, env, rule_dict, our_uid) time.sleep(1) driver.quit() @classmethod def search_elements(cls, driver: WebDriver, xpath): time.sleep(1) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: elements = driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass @classmethod def check_to_applet(cls, log_type, crawler, env, driver: WebDriver, xpath): time.sleep(1) webViews = driver.contexts driver.switch_to.context(webViews[-1]) windowHandles = driver.window_handles for handle in windowHandles: driver.switch_to.window(handle) time.sleep(1) try: driver.find_element(By.XPATH, xpath) Common.logger(log_type, crawler).info("切换到小程序成功\n") return except NoSuchElementException: time.sleep(1) @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("众妙音信", "刚刚都传", "吉祥幸福", "知青天天看", "zhufuquanzi", "祝福圈子", "haitunzhufu", "海豚祝福") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def swipe_up(cls, driver: WebDriver): cls.search_elements(driver, '//*[@class="bless--list"]') size = driver.get_window_size() driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.4), 200) @classmethod def get_video_url(cls, log_type, crawler, driver: WebDriver, video_title_element): for i in range(3): cls.search_elements(driver, '//*[@class="bless--list"]') Common.logger(log_type, crawler).info(f"video_title_element:{video_title_element[0]}") time.sleep(1) Common.logger(log_type, crawler).info("滑动标题至可见状态") driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0]) time.sleep(3) Common.logger(log_type, crawler).info("点击标题") video_title_element[0].click() # driver.execute_script("arguments[0].click();", video_title_element[0]) Common.logger(log_type, crawler).info("点击标题完成") time.sleep(1) video_url_elements = cls.search_elements(driver, '//*[@class="index--video-item index--video"]') if video_url_elements: return video_url_elements[0].get_attribute("src") @classmethod def get_videoList(cls, log_type, crawler, driver: WebDriver, env, rule_dict, our_uid): mq = MQ(topic_name="topic_crawler_etl_" + env) driver.implicitly_wait(20) cls.check_to_applet(log_type=log_type, crawler=crawler, env=env, driver=driver, xpath='//*[@class="tags--tag tags--tag-0 tags--checked"]') time.sleep(1) name = ["推荐", "搞笑", "大雪", "亲子"] selected_text = random.choice(name) try: driver.find_element(By.XPATH, f"//wx-button[contains(., '{selected_text}')]").click() time.sleep(2) except NoSuchElementException: Common.logger(log_type, crawler).info(f"没有该tab:{selected_text}\n") pass page = 0 while True: if cls.search_elements(driver, '//*[@class="bless--list"]') is None: Common.logger(log_type, crawler).info("窗口已销毁\n") AliyunLogger.logging( code="3000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message='窗口已销毁' ) cls.i = 0 cls.download_cnt = 0 cls.element_list = [] return page_source = driver.page_source soup = BeautifulSoup(page_source, 'html.parser') soup.prettify() video_list_elements = soup.findAll("wx-view", class_="expose--adapt-parent") # video_list_elements 有,cls.element_list 中没有的元素 video_list_elements = list(set(video_list_elements).difference(set(cls.element_list))) # video_list_elements 与 cls.element_list 的并集 cls.element_list = list(set(video_list_elements) | set(cls.element_list)) Common.logger(log_type, crawler).info(f"正在抓取第{page + 1}页,共:{len(video_list_elements)}条视频") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message=f"正在抓取第{page + 1}页,共:{len(video_list_elements)}条视频" ) if len(video_list_elements) == 0: for i in range(10): Common.logger(log_type, crawler).info(f"向上滑动第{i+1}次") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message=f"向上滑动第{i+1}次" ) cls.swipe_up(driver) time.sleep(0.5) continue for i, video_element in enumerate(video_list_elements): try: Common.logger(log_type, crawler).info(f"本轮已抓取{cls.download_cnt}条视频\n") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message=f"本轮已抓取{cls.download_cnt}条视频\n" ) if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 10)): cls.i = 0 cls.download_cnt = 0 cls.element_list = [] return trace_id = crawler + str(uuid.uuid1()) AliyunLogger.logging( code="1001", platform=ZFQZRecommend.platform, mode=log_type, env=env, trace_id=trace_id, message="扫描到一条视频", ) cls.i += 1 video_title = video_element.find("wx-view", class_="dynamic--title").text play_str = video_element.find("wx-view", class_="dynamic--views").text like_str = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")[0].text comment_str = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")[1].text duration_str = video_element.find("wx-view", class_="dynamic--duration").text user_name = video_element.find("wx-view", class_="dynamic--nick-top").text avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"] play_cnt = int(play_str.replace("+", "").replace("次播放", "")) duration = int(duration_str.split(":")[0].strip()) * 60 + int(duration_str.split(":")[-1].strip()) if "点赞" in like_str: like_cnt = 0 elif "万" in like_str: like_cnt = int(like_str.split("万")[0]) * 10000 else: like_cnt = int(like_str) if "评论" in comment_str: comment_cnt = 0 elif "万" in comment_str: comment_cnt = int(comment_str.split("万")[0]) * 10000 else: comment_cnt = int(comment_str) out_video_id = md5(video_title.encode('utf8')).hexdigest() out_user_id = md5(user_name.encode('utf8')).hexdigest() Common.logger(log_type, crawler).warning(f"视频标题:{video_title},点赞:{like_str},播放:{play_cnt},用户名称:{user_name},") video_dict = { "video_title": video_title, "video_id": out_video_id, 'out_video_id': out_video_id, "duration_str": duration_str, "duration": duration, "play_str": play_str, "play_cnt": play_cnt, "like_str": like_str, "like_cnt": like_cnt, "comment_cnt": comment_cnt, "share_cnt": 0, "user_name": user_name, "user_id": out_user_id, 'publish_time_stamp': int(time.time()), 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), 'update_time_stamp': int(time.time()), "avatar_url": avatar_url, "cover_url": cover_url, "session": f"zhufuquanzi-{int(time.time())}" } pipeline = PiaoQuanPipeline( platform=ZFQZRecommend.platform, mode=log_type, item=video_dict, rule_dict=rule_dict, env=env, trace_id=trace_id ) flag = pipeline.process_item() if flag: video_title_element = cls.search_elements(driver, f'//*[contains(text(), "{video_title}")]') if video_title_element is None: Common.logger(log_type, crawler).warning(f"未找到该视频标题的element:{video_title_element}") continue Common.logger(log_type, crawler).info("点击标题,进入视频详情页") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message=f"点击标题,进入视频详情页\n" ) video_url = cls.get_video_url(log_type, crawler, driver, video_title_element) video_url = get_redirect_url(video_url) Common.logger(log_type, crawler).warning(f"视频标题:{video_title},视频链接:{video_url}") if video_url is None: driver.press_keycode(AndroidKey.BACK) time.sleep(5) continue video_dict['video_url'] = video_url Common.logger(log_type, crawler).info(f"video_url:{video_url}") video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = our_uid video_dict["publish_time"] = video_dict["publish_time_str"] mq.send_msg(video_dict) cls.download_cnt += 1 driver.press_keycode(AndroidKey.BACK) time.sleep(5) cls.swipe_up(driver) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") driver.press_keycode(AndroidKey.BACK) AliyunLogger.logging( code="3001", platform=ZFQZRecommend.platform, mode=log_type, env=env, message=f"抓取单条视频异常:{e}\n" ) Common.logger(log_type, crawler).info("已抓取完一组,休眠 5 秒\n") AliyunLogger.logging( code="1000", platform=ZFQZRecommend.platform, mode=log_type, env=env, message="已抓取完一组,休眠 5 秒\n" ) time.sleep(5) page += 1 if __name__ == "__main__": rule_dict1 = {"period": {"min": 365, "max": 365}, "duration": {"min": 30, "max": 1800}, "favorite_cnt": {"min": 5000, "max": 0}, "videos_cnt": {"min": 10, "max": 20}, "share_cnt": {"min": 1000, "max": 0}} ZFQZRecommend.start_wechat("recommend", "zhufuquanzi", "dev", rule_dict1, 6267141)