# -*- coding: utf-8 -*- # @Time: 2023/11/17 import json import os import random import sys import time import uuid from datetime import datetime, timedelta from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.webdriver import WebDriver from bs4 import BeautifulSoup from selenium.common import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common import AliyunLogger, PiaoQuanPipeline, get_redirect_url from common.common import Common from common.mq import MQ from common.scheduling_db import MysqlHelper class ZFQZRecommendNew: env = None driver = None log_type = None def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = None self.platform = "zhufuquanzi" self.download_cnt = 0 self.element_list = [] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid if self.env == "dev": chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" else: chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" Common.logger(self.log_type, self.crawler).info("启动微信") # Common.logging(self.log_type, self.crawler, self.env, '启动微信') # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", # "platformVersion": "11", # "udid": "emulator-5554", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": "true", "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except Exception as e: print(e) AliyunLogger.logging( code="3002", platform=self.platform, mode=self.log_type, env=self.env, message=f'appium 启动异常: {e}' ) return self.driver.implicitly_wait(30) for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): Common.logger(self.log_type, self.crawler).info("微信启动成功") AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="启动微信成功" ) break elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"): Common.logger(self.log_type, self.crawler).info("发现并关闭系统下拉菜单") AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="发现并关闭系统下拉菜单" ) size = self.driver.get_window_size() self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.8), int(size['width'] * 0.5), int(size['height'] * 0.2), 200) else: pass except Exception as e: AliyunLogger.logging( code="3001", platform=self.platform, mode=self.log_type, env=self.env, message=f"打开微信异常:{e}" ) time.sleep(1) Common.logger(self.log_type, self.crawler).info("下滑,展示小程序选择面板") size = self.driver.get_window_size() self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) time.sleep(1) time.sleep(1) Common.logger(log_type, crawler).info('打开小程序"祝福圈子"') self.driver.find_elements(By.XPATH, '//*[@text="祝福圈子"]')[-1].click() AliyunLogger.logging( code="1000", platform=self.platform, mode=log_type, env=env, message='打开小程序"祝福圈子"成功' ) time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) Common.logger(self.log_type, self.crawler).info("切换到WebView成功\n") AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="成功切换到 webview" ) return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="bless--list"]') size = self.driver.get_window_size() self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.442), 200) self.swipe_count += 1 def parse_detail(self, index): page_source = self.driver.page_source soup = BeautifulSoup(page_source, 'html.parser') soup.prettify() video_list = soup.findAll(name="wx-view", attrs={"class": "expose--adapt-parent"}) element_list = [i for i in video_list][index:] return element_list[0] def get_videoList(self): self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.driver.implicitly_wait(20) self.check_to_applet(xpath='//*[@class="tags--tag tags--tag-0 tags--checked"]') time.sleep(1) name = ["推荐", "春节"] selected_text = random.choice(name) try: self.driver.find_element(By.XPATH, f"//wx-button[contains(., '{selected_text}')]").click() time.sleep(2) except NoSuchElementException: Common.logger(self.log_type, self.crawler).info(f"没有该tab:{selected_text}\n") pass print("开始获取视频信息") for i in range(20): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) self.swipe_up() time.sleep(1) if self.swipe_count > 100: return print("下滑完成") # time.sleep(100) Common.logger(self.log_type, self.crawler).info("已抓取完一组,休眠 5 秒\n") AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="已抓取完一组,休眠 5 秒\n", ) time.sleep(5) def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: self.driver.press_keycode(AndroidKey.BACK) Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") AliyunLogger.logging( code="3001", platform=self.platform, mode=self.log_type, env=self.env, message=f"抓取单条视频异常:{e}\n" ) def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="bless--list"]') Common.logger(self.log_type, self.crawler).info(f"video_title_element:{video_title_element[0]}") time.sleep(1) Common.logger(self.log_type, self.crawler).info("滑动标题至可见状态") self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0]) time.sleep(3) Common.logger(self.log_type, self.crawler).info("点击标题") video_title_element[0].click() time.sleep(5) self.check_to_applet(xpath=r'//*[@class="index--video-item index--video"]') Common.logger(self.log_type, self.crawler).info("点击标题完成") time.sleep(10) video_url_elements = self.search_elements( '//*[@class="index--video-item index--video"]') Common.logger(self.log_type, self.crawler).info(f"{video_url_elements[0].get_attribute('src')}") return video_url_elements[0].get_attribute('src') def repeat_video(self,out_video_id): current_time = datetime.now() previous_day = current_time - timedelta(days=7) formatted_time = previous_day.strftime("%Y-%m-%d") sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_video_id}" and create_time <= '{formatted_time}'; """ repeat_video = MysqlHelper.get_values( log_type=self.log_type, crawler=self.platform, env=self.env, sql=sql, action="" ) if repeat_video: return False return True def get_video_info_2(self, video_element): Common.logger(self.log_type, self.crawler).info(f"本轮已抓取{self.download_cnt}条视频\n") # Common.logging(self.log_type, self.crawler, self.env, f"本轮已抓取{self.download_cnt}条视频\n") if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 10)): self.count = 0 self.download_cnt = 0 self.element_list = [] return self.count += 1 Common.logger(self.log_type, self.crawler).info(f"第{self.count}条视频") # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引 trace_id = self.crawler + str(uuid.uuid1()) video_title = video_element.find("wx-view", class_="dynamic--title").text play_str = video_element.find("wx-view", class_="dynamic--views").text # like_str = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")[0].text # comment_str = video_element.findAll("wx-view", class_="dynamic--commerce-btn-text")[1].text duration_str = video_element.find("wx-view", class_="dynamic--duration").text user_name = video_element.find("wx-view", class_="dynamic--nick-top").text avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"] play_cnt = int(play_str.replace("+", "").replace("次播放", "")) duration = int(duration_str.split(":")[0].strip()) * 60 + int(duration_str.split(":")[-1].strip()) # if "点赞" in like_str: # like_cnt = 0 # elif "万" in like_str: # like_cnt = int(like_str.split("万")[0]) * 10000 # else: # like_cnt = int(like_str) # if "评论" in comment_str: # comment_cnt = 0 # elif "万" in comment_str: # comment_cnt = int(comment_str.split("万")[0]) * 10000 # else: # comment_cnt = int(comment_str) out_video_id = md5(video_title.encode('utf8')).hexdigest() out_user_id = md5(user_name.encode('utf8')).hexdigest() repeat_id = self.repeat_video(out_video_id) if False == repeat_id: num = time.time() out_video_id = out_video_id+str(num) video_dict = { "video_title": video_title, "video_id": out_video_id, 'out_video_id': out_video_id, "duration_str": duration_str, "duration": duration, "play_str": play_str, "play_cnt": play_cnt, "like_str": "", "like_cnt": 50, "comment_cnt": 0, "share_cnt": 50, "user_name": user_name, "user_id": out_user_id, 'publish_time_stamp': int(time.time()), 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), 'update_time_stamp': int(time.time()), "avatar_url": avatar_url, "cover_url": cover_url, "session": f"zhufuquanzi-{int(time.time())}" } AliyunLogger.logging( code="1001", platform=self.platform, mode=self.log_type, env=self.env, trace_id=trace_id, message="扫描到一条视频", data=video_dict ) pipeline = PiaoQuanPipeline( platform=self.crawler, mode=self.log_type, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id ) flag = pipeline.process_item() if flag: video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]') if video_title_element is None: return Common.logger(self.log_type, self.crawler).info("点击标题,进入视频详情页") AliyunLogger.logging( code="1000", platform=self.platform, mode=self.log_type, env=self.env, message="点击标题,进入视频详情页", ) video_url = self.get_video_url(video_title_element) video_url = get_redirect_url(video_url) if video_url is None: self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) return video_dict['video_url'] = video_url video_dict['like_cnt'] = 0 video_dict['share_cnt'] = 0 video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] self.mq.send_msg(video_dict) AliyunLogger.logging( code="1002", platform=self.platform, mode=self.log_type, env=self.env, trace_id=trace_id, message="发送到ETL成功", data=video_dict ) self.download_cnt += 1 self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) if __name__ == "__main__": rule_dict1 = {"period": {"min": 0, "max": 0}, "duration": {"min": 1, "max": 0}, "favorite_cnt": {"min": 0, "max": 0}, "videos_cnt": {"min": 0, "max": 0}, "share_cnt": {"min": 0, "max": 0}} ZFQZRecommendNew("recommend", "zhufuquanzi", "dev", rule_dict1, 6267141)