# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/12/18 import json import os import random import sys import time import uuid from hashlib import md5 from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from appium.webdriver.common.touch_action import TouchAction from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from application.functions import get_redirect_url from application.pipeline import PiaoQuanPipelineTest from application.common.messageQueue import MQ from application.common.log import Local, AliyunLogger class XiaoNianGaoPlusRecommend(object): """ 小年糕+线下爬虫 """ def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = MQ(topic_name="topic_crawler_etl_prod") self.platform = "xiaoniangaoplus" self.download_cnt = 0 self.element_list = [] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid chromedriverExecutable = "/Users/luojunhui/chromedriver/chromedriver_v116/chromedriver" print("启动微信") # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": True, "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } try: self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) except Exception as e: print(e) return self.driver.implicitly_wait(30) for i in range(10): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): print("启动微信成功") break elif self.driver.find_element( By.ID, "com.android.systemui:id/dismiss_view" ): print("发现并关闭系统下拉菜单") size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.2), 200, ) else: pass except Exception as e: print(f"打开微信异常:{e}") time.sleep(1) size = self.driver.get_window_size() self.driver.swipe( int(size["width"] * 0.5), int(size["height"] * 0.2), int(size["width"] * 0.5), int(size["height"] * 0.8), 200, ) time.sleep(1) self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click() print("打开小程序小年糕+成功") time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) print("切换到WebView成功\n") return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() # self.driver.swipe( # int(size["width"] * 0.5), # int(size["height"] * 0.8), # int(size["width"] * 0.5), # int(size["height"] * 0.442), # 200, # ) action = TouchAction(self.driver) action.press(x=int(size["width"] * 0.5), y=int(size["height"] * 0.85)) action.wait(ms=1300) # 可以调整等待时间 action.move_to(x=int(size["width"] * 0.5), y=int(size["height"] * 0.2)) action.release() action.perform() self.swipe_count += 1 def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="list-list--list"]') time.sleep(1) self.driver.execute_script( "arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0], ) time.sleep(3) video_title_element[0].click() self.check_to_applet( xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) time.sleep(10) video_url_elements = self.search_elements( '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]' ) return video_url_elements[0].get_attribute("src") def parse_detail(self, index): page_source = self.driver.page_source soup = BeautifulSoup(page_source, "html.parser") soup.prettify() video_list = soup.findAll( name="wx-view", attrs={"class": "expose--adapt-parent"} ) index = index + 1 element_list = [i for i in video_list][index:] return element_list[0] def get_video_info_2(self, video_element): if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 10) ): self.count = 0 self.download_cnt = 0 self.element_list = [] return self.count += 1 # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引 trace_id = self.crawler + str(uuid.uuid1()) print("扫描到一条视频") # 标题 video_title = video_element.find("wx-view", class_="dynamic--title").text # 播放量字符串 play_str = video_element.find("wx-view", class_="dynamic--views").text info_list = video_element.findAll( "wx-view", class_="dynamic--commerce-btn-text" ) # 点赞数量 like_str = info_list[1].text # 评论数量 comment_str = info_list[2].text # 视频时长 duration_str = video_element.find("wx-view", class_="dynamic--duration").text user_name = video_element.find("wx-view", class_="dynamic--nick-top").text # 头像 URL avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"] # 封面 URL cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"] play_cnt = int(play_str.replace("+", "").replace("次播放", "")) duration = int(duration_str.split(":")[0].strip()) * 60 + int( duration_str.split(":")[-1].strip() ) if "点赞" in like_str: like_cnt = 0 elif "万" in like_str: like_cnt = int(like_str.split("万")[0]) * 10000 else: like_cnt = int(like_str) if "评论" in comment_str: comment_cnt = 0 elif "万" in comment_str: comment_cnt = int(comment_str.split("万")[0]) * 10000 else: comment_cnt = int(comment_str) out_video_id = md5(video_title.encode("utf8")).hexdigest() out_user_id = md5(user_name.encode("utf8")).hexdigest() video_dict = { "video_title": video_title, "video_id": out_video_id, "out_video_id": out_video_id, "duration_str": duration_str, "duration": duration, "play_str": play_str, "play_cnt": play_cnt, "like_str": like_str, "like_cnt": like_cnt, "comment_cnt": comment_cnt, "share_cnt": 0, "user_name": user_name, "user_id": out_user_id, "publish_time_stamp": int(time.time()), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), "update_time_stamp": int(time.time()), "avatar_url": avatar_url, "cover_url": cover_url, "session": f"xiaoniangao-{int(time.time())}", } print(json.dumps(video_dict, ensure_ascii=False, indent=4)) Local.logger(platform=self.platform, mode=self.log_type).info( "scan_data_" + json.dumps(video_dict, ensure_ascii=False)) pipeline = PiaoQuanPipelineTest( platform=self.crawler, mode=self.log_type, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) flag = pipeline.process_item() if flag: video_title_element = self.search_elements( f'//*[contains(text(), "{video_title}")]' ) if video_title_element is None: return print("点击标题,进入视频详情页") video_url = self.get_video_url(video_title_element) print(video_url) video_url = get_redirect_url(video_url) print(video_url) if video_url is None: self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) return video_dict["video_url"] = video_url video_dict["platform"] = self.crawler video_dict["strategy"] = self.log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = random.choice(self.our_uid) video_dict["publish_time"] = video_dict["publish_time_str"] print(json.dumps(video_dict, ensure_ascii=False, indent=4)) self.mq.send_msg(video_dict) AliyunLogger(platform=self.platform, mode=self.log_type).logging( code="1002", message="发送视频至 ETL", data=video_dict ) self.download_cnt += 1 self.driver.press_keycode(AndroidKey.BACK) time.sleep(5) def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: self.driver.press_keycode(AndroidKey.BACK) print(f"抓取单条视频异常:{e}\n") def get_videoList(self): """ 获取视频列表 :return: """ # while True: self.driver.implicitly_wait(20) # 切换到 web_view self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]') print("切换到 webview 成功") time.sleep(1) if self.search_elements('//*[@class="list-list--list"]') is None: print("窗口已销毁") self.count = 0 self.download_cnt = 0 self.element_list = [] return print("开始获取视频信息") for i in range(50): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) self.swipe_up() time.sleep(random.randint(1, 5)) # if self.swipe_count > 100: # return print("已抓取完一组,休眠 600 秒\n") # time.sleep(600)