# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/12/18 import json import os import sys import time import uuid from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from application.functions import get_redirect_url from application.pipeline import PiaoQuanPipelineTest from application.common.messageQueue import MQ from application.common.log import AliyunLogger class XiaoNianGaoPlusRecommend(object): def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = None self.platform = "xiaoniangaoplus" self.download_cnt = 0 self.element_list = [ ] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid chromedriverExecutable = "/usr/bin/chromedriver" print("启动微信") # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": "true", "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: self.driver = webdriver.Remote("http://localhost:4750/wd/hub", caps) except Exception as e: print(e) return self.driver.implicitly_wait(30) for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): print("启动微信成功") break elif self.driver.find_element( By.ID, "com.android.systemui:id/dismiss_view" ): print("发现并关闭系统下拉菜单") size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.2), 200, ) else: pass except Exception as e: print(f"打开微信异常:{e}") time.sleep(1) size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.2), int(size["width"] * 0.5), int(size["height"] * 0.8), 200, ) time.sleep(1) self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click() print("打开小程序小年糕+成功") time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) print("切换到WebView成功\n") return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.442), 200, ) self.swipe_count += 1 def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="list-list--list"]') time.sleep(1) self.driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0], ) time.sleep(3) video_title_element[0].click() self.check_to_applet( xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) time.sleep(10) video_url_elements = self.search_elements( '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) return video_url_elements[0].get_attribute("src") def parse_detail(self, index): page_source = self.driver.page_source soup = BeautifulSoup(page_source, "html.parser") soup.prettify() video_list = soup.findAll( name="wx-view", attrs={"class": "expose--adapt-parent"} ) index = index + 1 element_list = [i for i in video_list][index:] return element_list[0] def get_video_info_2(self, video_element): if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 10) ): self.count = 0 self.download_cnt = 0 self.element_list = [] return self.count += 1 # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引 trace_id = self.crawler + str(uuid.uuid1()) print("扫描到一条视频") # 标题 video_title = video_element.find("wx-view", class_="dynamic--title").text # 播放量字符串 play_str = video_element.find("wx-view", class_="dynamic--views").text info_list = video_element.findAll( "wx-view", class_="dynamic--commerce-btn-text" ) # 点赞数量 like_str = info_list[1].text # 评论数量 comment_str = info_list[2].text # 视频时长 duration_str = video_element.find("wx-view", class_="dynamic--duration").text user_name = video_element.find("wx-view", class_="dynamic--nick-top").text # 头像 URL avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] # 封面 URL cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"] play_cnt = int(play_str.replace("+", "").replace("次播放", "")) duration = int(duration_str.split(":")[0].strip()) * 60 + int( duration_str.split(":")[-1].strip() ) if "点赞" in like_str: like_cnt = 0 elif "万" in like_str: like_cnt = int(like_str.split("万")[0]) * 10000 else: like_cnt = int(like_str) if "评论" in comment_str: comment_cnt = 0 elif "万" in comment_str: comment_cnt = int(comment_str.split("万")[0]) * 10000 else: comment_cnt = int(comment_str) out_video_id = md5(video_title.encode("utf8")).hexdigest() out_user_id = md5(user_name.encode("utf8")).hexdigest() video_dict = { "video_title": video_title, "video_id": out_video_id, "out_video_id": out_video_id, "duration_str": duration_str, "duration": duration, "play_str": play_str, "play_cnt": play_cnt, "like_str": like_str, "like_cnt": like_cnt, "comment_cnt": comment_cnt, "share_cnt": 0, "user_name": user_name, "user_id": out_user_id, "publish_time_stamp": int(time.time()), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), "update_time_stamp": int(time.time()), "avatar_url": avatar_url, "cover_url": cover_url, "session": f"xiaoniangao-{int(time.time())}", } pipeline = PiaoQuanPipelineTest( platform=self.crawler, mode=self.log_type, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) flag = pipeline.process_item() if flag: video_title_element = self.search_elements( f'//*[contains(text(), "{video_title}")]' ) if video_title_element is None: return print("点击标题,进入视频详情页") video_url = self.get_video_url(video_title_element) print(video_url) video_url = get_redirect_url(video_url) print(video_url) if video_url is None: self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) return video_dict["video_url"] = video_url video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = self.our_uid video_dict["publish_time"] = video_dict["publish_time_str"] print(json.dumps(video_dict, ensure_ascii=False, indent=4)) self.download_cnt += 1 self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: self.driver.press_keycode(AndroidKey.BACK) print(f"抓取单条视频异常:{e}\n") def get_videoList(self): self.driver.implicitly_wait(20) # 切换到 web_view self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]') print("切换到 webview 成功") time.sleep(1) page = 0 if self.search_elements('//*[@class="list-list--list"]') is None: print("窗口已销毁") self.count = 0 self.download_cnt = 0 self.element_list = [] return print("开始获取视频信息") for i in range(50): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) self.swipe_up() time.sleep(1) if self.swipe_count > 100: return print("已抓取完一组,休眠 5 秒\n") time.sleep(5)