| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 | # -*- coding: utf-8 -*-# @Author: zhangyong# @Time: 2023/11/24import jsonimport osimport randomimport sysimport timeimport uuidfrom hashlib import md5from appium import webdriverfrom appium.webdriver.extensions.android.nativekey import AndroidKeyfrom bs4 import BeautifulSoupfrom selenium.common.exceptions import NoSuchElementExceptionfrom selenium.webdriver.common.by import Byimport multiprocessingsys.path.append(os.getcwd())from application.common.log import AliyunLogger, Localfrom application.common.messageQueue import MQfrom application.functions import get_redirect_urlfrom application.pipeline import PiaoQuanPipelineclass PPQRecommend:    def __init__(self, log_type, crawler, env, rule_dict, our_uid):        self.mq = MQ(topic_name="topic_crawler_etl_" + env)        self.platform = "piaopiaoquan"        self.download_cnt = 0        self.element_list = []        self.count = 0        self.swipe_count = 0        self.log_type = log_type        self.crawler = crawler        self.env = env        self.rule_dict = rule_dict        self.our_uid = our_uid        chromedriverExecutable = "/usr/bin/chromedriver"        self.aliyun_log = AliyunLogger(platform=crawler, mode=log_type, env=env)        Local.logger(self.log_type, self.crawler).info("启动微信")        # 微信的配置文件        caps = {            "platformName": "Android",            "devicesName": "Android",            "appPackage": "com.tencent.mm",            "appActivity": ".ui.LauncherUI",            "autoGrantPermissions": "true",            "noReset": True,            "resetkeyboard": True,            "unicodekeyboard": True,            "showChromedriverLog": True,            "printPageSourceOnFailure": True,            "recreateChromeDriverSessions": True,            "enableWebviewDetailsCollection": True,            "setWebContentsDebuggingEnabled": True,            "newCommandTimeout": 6000,            "automationName": "UiAutomator2",            "chromedriverExecutable": chromedriverExecutable,            "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},        }        try:            self.driver = webdriver.Remote("http://localhost:4750/wd/hub", caps)        except Exception as e:            print(e)            self.aliyun_log.logging(                code="3002",                message=f'appium 启动异常: {e}'            )            return        self.driver.implicitly_wait(30)        for i in range(120):            try:                if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"):                    Local.logger(self.log_type, self.crawler).info("微信启动成功")                    # Common.logging(self.log_type, self.crawler, self.env, '微信启动成功')                    self.aliyun_log.logging(                        code="1000",                        message="启动微信成功"                    )                    break                elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"):                    Local.logger(self.log_type, self.crawler).info("发现并关闭系统下拉菜单")                    # Common.logging(self.log_type, self.crawler, self.env, '发现并关闭系统下拉菜单')                    self.aliyun_log.logging(                        code="1000",                        message="发现并关闭系统下拉菜单"                    )                    size = self.driver.get_window_size()                    self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.8),                                      int(size['width'] * 0.5), int(size['height'] * 0.2), 200)                else:                    pass            except NoSuchElementException:                self.aliyun_log.logging(                    code="3001",                    message="打开微信异常"                )                time.sleep(1)        Local.logger(self.log_type, self.crawler).info("下滑,展示小程序选择面板")        size = self.driver.get_window_size()        self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2),                          int(size['width'] * 0.5), int(size['height'] * 0.8), 200)        time.sleep(1)        Local.logger(self.log_type, self.crawler).info('打开小程序"漂漂圈丨福年"')        self.driver.find_elements(By.XPATH, '//*[@text="漂漂圈丨福年"]')[-1].click()        self.aliyun_log.logging(            code="1000",            message="打开小程序漂漂圈丨福年成功"        )        time.sleep(5)        self.get_videoList()        time.sleep(1)        self.driver.quit()    def search_elements(self, xpath):        time.sleep(1)        windowHandles = self.driver.window_handles        for handle in windowHandles:            self.driver.switch_to.window(handle)            time.sleep(1)            try:                elements = self.driver.find_elements(By.XPATH, xpath)                if elements:                    return elements            except NoSuchElementException:                pass    def check_to_applet(self, xpath):        time.sleep(1)        webViews = self.driver.contexts        self.driver.switch_to.context(webViews[-1])        windowHandles = self.driver.window_handles        for handle in windowHandles:            self.driver.switch_to.window(handle)            time.sleep(1)            try:                self.driver.find_element(By.XPATH, xpath)                Local.logger(self.log_type, self.crawler).info("切换到WebView成功\n")                # Common.logging(self.log_type, self.crawler, self.env, '切换到WebView成功\n')                self.aliyun_log.logging(                    code="1000",                    message="成功切换到 webview"                )                return            except NoSuchElementException:                time.sleep(1)    def swipe_up(self):        self.search_elements('//*[@class="dynamic--title-container"]')        size = self.driver.get_window_size()        self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8),                          int(size["width"] * 0.5), int(size["height"] * 0.442), 200)        self.swipe_count += 1    def get_video_url(self, video_title_element):        for i in range(3):            self.search_elements('//*[@class="dynamic--title-container"]')            Local.logger(self.log_type, self.crawler).info(f"video_title_element:{video_title_element[0]}")            time.sleep(1)            Local.logger(self.log_type, self.crawler).info("滑动标题至可见状态")            self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});",                                       video_title_element[0])            time.sleep(3)            Local.logger(self.log_type, self.crawler).info("点击标题")            video_title_element[0].click()            self.check_to_applet(xpath=r'//wx-video[@class="infos--title infos--ellipsis"]')            Local.logger(self.log_type, self.crawler).info("点击标题完成")            time.sleep(10)            video_url_elements = self.search_elements(                '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]')            if video_url_elements:                return video_url_elements[0].get_attribute("src")    def parse_detail(self, index):        page_source = self.driver.page_source        soup = BeautifulSoup(page_source, 'html.parser')        soup.prettify()        video_list = soup.findAll(name="wx-view", attrs={"class": "expose--adapt-parent"})        element_list = [i for i in video_list][index:]        return element_list[0]    def get_video_info_2(self, video_element):        Local.logger(self.log_type, self.crawler).info(f"本轮已抓取{self.download_cnt}条视频\n")        # Common.logging(self.log_type, self.crawler, self.env, f"本轮已抓取{self.download_cnt}条视频\n")        if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 10)):            self.count = 0            self.download_cnt = 0            self.element_list = []            return        self.count += 1        Local.logger(self.log_type, self.crawler).info(f"第{self.count}条视频")        # 获取 trace_id, 并且把该 id 当做视频生命周期唯一索引        trace_id = self.crawler + str(uuid.uuid1())        self.aliyun_log.logging(            code="1001",            trace_id=trace_id,            message="扫描到一条视频",        )        # 标题        video_title = video_element.find("wx-view", class_="dynamic--title").text        # 播放量字符串        play_str = video_element.find("wx-view", class_="dynamic--views").text        # 视频时长        duration_str = video_element.find("wx-view", class_="dynamic--duration").text        user_name = video_element.find("wx-view", class_="dynamic--nick-top").text        # 头像 URL        avatar_url = video_element.find("wx-image", class_="avatar--avatar")["src"]        # 封面 URL        cover_url = video_element.find("wx-image", class_="dynamic--bg-image")["src"]        play_cnt = int(play_str.replace("+", "").replace("次播放", ""))        duration = int(duration_str.split(":")[0].strip()) * 60 + int(duration_str.split(":")[-1].strip())        out_video_id = md5(video_title.encode('utf8')).hexdigest()        out_user_id = md5(user_name.encode('utf8')).hexdigest()        video_dict = {            "video_title": video_title,            "video_id": out_video_id,            'out_video_id': out_video_id,            "duration_str": duration_str,            "duration": duration,            "play_str": play_str,            "play_cnt": play_cnt,            "like_str": "",            "like_cnt": 0,            "comment_cnt": 0,            "share_cnt": 0,            "user_name": user_name,            "user_id": out_user_id,            'publish_time_stamp': int(time.time()),            'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),            'update_time_stamp': int(time.time()),            "avatar_url": avatar_url,            "cover_url": cover_url,            "session": f"piaopiiaoquan-{int(time.time())}"        }        pipeline = PiaoQuanPipeline(            platform=self.crawler,            mode=self.log_type,            item=video_dict,            rule_dict=self.rule_dict,            env=self.env,            trace_id=trace_id        )        flag = pipeline.process_item()        if flag:            video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]')            if video_title_element is None:                return            Local.logger(self.log_type, self.crawler).info("点击标题,进入视频详情页")            self.aliyun_log.logging(                code="1000",                message="点击标题,进入视频详情页",            )            video_url = self.get_video_url(video_title_element)            video_url = get_redirect_url(video_url)            if video_url is None:                self.driver.press_keycode(AndroidKey.BACK)                time.sleep(5)                return            video_dict['video_url'] = video_url            video_dict["platform"] = self.crawler            video_dict["strategy"] = self.log_type            video_dict["out_video_id"] = video_dict["video_id"]            video_dict["crawler_rule"] = json.dumps(self.rule_dict)            video_dict["user_id"] = self.our_uid            video_dict["publish_time"] = video_dict["publish_time_str"]            self.mq.send_msg(video_dict)            self.download_cnt += 1            self.driver.press_keycode(AndroidKey.BACK)            time.sleep(5)    def get_video_info(self, video_element):        try:            self.get_video_info_2(video_element)        except Exception as e:            self.driver.press_keycode(AndroidKey.BACK)            Local.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")            self.aliyun_log.logging(                code="3001",                message=f"抓取单条视频异常:{e}\n"            )    def get_videoList(self):        self.driver.implicitly_wait(20)        # 切换到 web_view        self.check_to_applet(xpath='//*[@class="expose--adapt-parent"]')        print("切换到 webview 成功")        time.sleep(1)        page = 0        if self.search_elements('//*[@class="expose--adapt-parent"]') is None:            Local.logger(self.log_type, self.crawler).info("窗口已销毁\n")            # Common.logging(self.log_type, self.crawler, self.env, '窗口已销毁\n')            self.aliyun_log.logging(                code="3000",                message="窗口已销毁"            )            self.count = 0            self.download_cnt = 0            self.element_list = []            return        print("开始获取视频信息")        for i in range(50):            print("下滑{}次".format(i))            element = self.parse_detail(i)            self.get_video_info(element)            self.swipe_up()            time.sleep(1)            if self.swipe_count > 100:                return        print("下滑完成")        # time.sleep(100)        Local.logger(self.log_type, self.crawler).info("已抓取完一组,休眠 5 秒\n")        # Common.logging(self.log_type, self.crawler, self.env, "已抓取完一组,休眠 5 秒\n")        self.aliyun_log.logging(            code="1000",            message="已抓取完一组,休眠 5 秒\n",        )        time.sleep(5)def run():    rule_dict1 = {"period": {"min": 365, "max": 365},                  "duration": {"min": 30, "max": 1800},                  "favorite_cnt": {"min": 0, "max": 0},                  "videos_cnt": {"min": 5000, "max": 0},                  "share_cnt": {"min": 0, "max": 0}}    PPQRecommend("recommend", "piaopiaoquan", "prod", rule_dict1, [64120158, 64120157, 63676778])if __name__ == "__main__":    process = multiprocessing.Process(        target=run    )    process.start()    while True:        if not process.is_alive():            print("正在重启")            process.terminate()            time.sleep(60)            os.system("adb forward --remove-all")            process = multiprocessing.Process(target=run)            process.start()        time.sleep(60)
 |