# -*- coding: utf-8 -*- # @Time: 2023/11/14 import json import os import random import sys import time from datetime import date, timedelta import requests from appium import webdriver from appium.webdriver.extensions.android.nativekey import AndroidKey from bs4 import BeautifulSoup from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.by import By import multiprocessing from common import AliyunLogger from common.feishu import Feishu from common.public import clean_title, get_config_from_mysql sys.path.append(os.getcwd()) from common.common import Common from common.mq import MQ from common.scheduling_db import MysqlHelper def get_redirect_url(url): res = requests.get(url, allow_redirects=False) if res.status_code == 302 or res.status_code == 301: return res.headers['Location'] else: return url class XiaoNianGaoPlusRecommend: env = None driver = None log_type = None def __init__(self, log_type, crawler, env, rule_dict, our_uid): self.mq = None self.platform = "小年糕+主页账号ID" self.download_cnt = 0 self.element_list = [] self.count = 0 self.swipe_count = 0 self.log_type = log_type self.crawler = crawler self.env = env self.rule_dict = rule_dict self.our_uid = our_uid if self.env == "dev": chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" else: chromedriverExecutable = "/Users/piaoquan/Downloads/chromedriver" Common.logger(self.log_type, self.crawler).info("启动微信") # 微信的配置文件 caps = { "platformName": "Android", "devicesName": "Android", "appPackage": "com.tencent.mm", "appActivity": ".ui.LauncherUI", "autoGrantPermissions": "true", "noReset": True, "resetkeyboard": True, "unicodekeyboard": True, "showChromedriverLog": True, "printPageSourceOnFailure": True, "recreateChromeDriverSessions": True, "enableWebviewDetailsCollection": True, "setWebContentsDebuggingEnabled": True, "newCommandTimeout": 6000, "automationName": "UiAutomator2", "chromedriverExecutable": chromedriverExecutable, "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"}, } self.driver = webdriver.Remote("http://localhost:4723/wd/hub", caps) self.driver.implicitly_wait(30) for i in range(120): try: if self.driver.find_elements(By.ID, "com.tencent.mm:id/f2s"): Common.logger(self.log_type, self.crawler).info("微信启动成功") break elif self.driver.find_element(By.ID, "com.android.systemui:id/dismiss_view"): Common.logger(self.log_type, self.crawler).info("发现并关闭系统下拉菜单") size = self.driver.get_window_size() self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.8), int(size['width'] * 0.5), int(size['height'] * 0.2), 200) else: pass except NoSuchElementException: time.sleep(1) Common.logger(self.log_type, self.crawler).info("下滑,展示小程序选择面板") size = self.driver.get_window_size() self.driver.swipe(int(size['width'] * 0.5), int(size['height'] * 0.2), int(size['width'] * 0.5), int(size['height'] * 0.8), 200) time.sleep(1) Common.logger(self.log_type, self.crawler).info('打开小程序"小年糕+"') self.driver.find_elements(By.XPATH, '//*[@text="小年糕+"]')[-1].click() time.sleep(5) self.get_videoList() time.sleep(1) self.driver.quit() def search_elements(self, xpath): time.sleep(1) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: elements = self.driver.find_elements(By.XPATH, xpath) if elements: return elements except NoSuchElementException: pass def check_to_applet(self, xpath): time.sleep(1) webViews = self.driver.contexts self.driver.switch_to.context(webViews[-1]) windowHandles = self.driver.window_handles for handle in windowHandles: self.driver.switch_to.window(handle) time.sleep(1) try: self.driver.find_element(By.XPATH, xpath) Common.logger(self.log_type, self.crawler).info("切换到WebView成功\n") return except NoSuchElementException: time.sleep(1) def swipe_up(self): self.search_elements('//*[@class="list-list--list"]') size = self.driver.get_window_size() self.driver.swipe(int(size["width"] * 0.5), int(size["height"] * 0.8), int(size["width"] * 0.5), int(size["height"] * 0.442), 200) self.swipe_count += 1 def get_video_url(self, video_title_element): for i in range(3): self.search_elements('//*[@class="list-list--list"]') Common.logger(self.log_type, self.crawler).info(f"video_title_element:{video_title_element[0]}") time.sleep(1) Common.logger(self.log_type, self.crawler).info("滑动标题至可见状态") self.driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'});", video_title_element[0]) time.sleep(3) Common.logger(self.log_type, self.crawler).info("点击标题") video_title_element[0].click() self.check_to_applet(xpath=r'//wx-video[@class="dynamic-index--video-item dynamic-index--video"]') Common.logger(self.log_type, self.crawler).info("点击标题完成") time.sleep(10) video_url_elements = self.search_elements( '//wx-video[@class="dynamic-index--video-item dynamic-index--video"]') if video_url_elements: return video_url_elements[0].get_attribute("src") def parse_detail(self, index): page_source = self.driver.page_source soup = BeautifulSoup(page_source, 'html.parser') soup.prettify() video_list = soup.findAll(name="wx-view", attrs={"class": "expose--adapt-parent"}) index = index + 1 element_list = [i for i in video_list][index:] return element_list[0] def get_video_info_2(self, video_element): Common.logger(self.log_type, self.crawler).info(f"本轮已抓取{self.download_cnt}条视频\n") if self.download_cnt >= int(self.rule_dict.get("videos_cnt", {}).get("min", 10)): self.count = 0 self.download_cnt = 0 self.element_list = [] return self.count += 1 Common.logger(self.log_type, self.crawler).info(f"第{self.count}条视频") # 标题 video_title = video_element.find("wx-view", class_="dynamic--title").text # 用户名称 user_name = video_element.find("wx-view", class_="dynamic--nick-top").text video_title_element = self.search_elements(f'//*[contains(text(), "{video_title}")]') if video_title_element is None: Common.logger(self.log_type, self.crawler).warning( f"未找到该视频标题的element:{video_title_element}") return Common.logger(self.log_type, self.crawler).info("点击标题,进入视频详情页") self.get_video_url(video_title_element) video_mid_elements = self.search_elements("//wx-view[@class='bar--navBar-content-capsule-wrap']") mid = int(video_mid_elements[0].get_attribute("data-mid")) repeat_video_id = self.repeat_video_id(mid) if repeat_video_id: Common.logger(self.log_type, self.crawler).info(f"该用户已经存在") # status = 0 # self.insert_user(mid, user_name, data_list, status) self.driver.press_keycode(AndroidKey.BACK) return data_list = self.get_user_list(mid) if len(data_list) == 0: Common.logger(self.log_type, self.crawler).info(f"不满足抓取条件") self.driver.press_keycode(AndroidKey.BACK) return else: status = 1 localtime = time.localtime(time.time()) formatted_time = time.strftime("%Y-%m-%d", localtime) print(formatted_time) self.insert_user(mid, user_name, data_list, status, formatted_time) values = [[ mid, user_name, formatted_time, data_list ]] Feishu.insert_columns('xiaoniangao', 'xiaoniangao', "8zlceR", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values('xiaoniangao', 'xiaoniangao', "8zlceR", "A2:Z2", values) Common.logger(self.log_type, self.crawler).info(f"{mid}:{user_name}写入成功") AliyunLogger.logging( code="1010", platform=self.platform, mode=self.log_type, env=self.env, message=f"{mid}:{user_name}入库", ) self.driver.press_keycode(AndroidKey.BACK) time.sleep(2) def insert_user(self, mid, user_name, data_list, status, formatted_time): insert_sql = f"""insert into crawler_xng_userid( user_id , user_name , user_title_text , status, time) values ({mid},"{user_name}", "{data_list}",{status}, "{formatted_time}")""" print(insert_sql) MysqlHelper.update_values(self.log_type, self.crawler, insert_sql, self.env, action='') def get_user_list(self, mid): next_t = -1 url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" headers = { 'Host': 'kapi-xng-app.xiaoniangao.cn', 'content-type': 'application/json; charset=utf-8', 'accept': '*/*', 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=', 'verb': 'POST', 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083', 'x-b3-traceid': '2f9da41f960ae077', 'accept-language': 'zh-cn', 'date': 'Mon, 19 Jun 2023 06:41:17 GMT', 'x-token-id': '', 'x-signaturemethod': 'hmac-sha1', 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0' } payload = { "token": "", "limit": 20, "start_t": next_t, "visited_mid": mid, "share_width": 300, "share_height": 240, } response = requests.request( "POST", url, headers=headers, data=json.dumps(payload), ) data_list = [] if "data" not in response.text or response.status_code != 200: return data_list elif "list" not in response.json()["data"]: return data_list elif len(response.json()["data"]["list"]) == 0: return data_list list = response.json()["data"]["list"] for video_obj in list: video_title = clean_title(video_obj.get("title", "")) # 发布时间 publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000) publish_time_str = time.strftime( "%Y-%m-%d", time.localtime(publish_time_stamp) ) date_three_days_ago_string = (date.today() + timedelta(days=-7)).strftime("%Y-%m-%d") rule = publish_time_str >= date_three_days_ago_string if rule == False: return "" v_url = video_obj.get("v_url") data_list.append(video_title + ":" + v_url) return data_list def repeat_video_id(self,mid): sql = f"SELECT `link` FROM `crawler_user_v3` WHERE `source` = 'xiaoniangao' and `link` = {mid}" repeat_video_id = MysqlHelper.get_values(self.log_type, self.crawler, sql, self.env) return len(repeat_video_id) def get_video_info(self, video_element): try: self.get_video_info_2(video_element) except Exception as e: self.driver.press_keycode(AndroidKey.BACK) Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n") def get_videoList(self): self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.driver.implicitly_wait(20) # 切换到 web_view self.check_to_applet(xpath='//*[@class="tab-bar--tab tab-bar--tab-selected"]') print("切换到 webview 成功") time.sleep(1) if self.search_elements('//*[@class="list-list--list"]') is None: Common.logger(self.log_type, self.crawler).info("窗口已销毁\n") self.count = 0 self.download_cnt = 0 self.element_list = [] return print("开始获取视频信息") for i in range(50): print("下滑{}次".format(i)) element = self.parse_detail(i) self.get_video_info(element) self.swipe_up() time.sleep(1) if self.swipe_count > 100: return print("下滑完成") Common.logger(self.log_type, self.crawler).info("已抓取完一组,休眠 5 秒\n") time.sleep(5) def run(): rule_dict1 = {"period": {"min": 365, "max": 365}, "duration": {"min": 30, "max": 1800}, "favorite_cnt": {"min": 0, "max": 0}, "videos_cnt": {"min": 5000, "max": 0}, "share_cnt": {"min": 0, "max": 0}} XiaoNianGaoPlusRecommend("recommend", "xiaoniangao", "prod", rule_dict1, 6267141) if __name__ == "__main__": process = multiprocessing.Process( target=run ) process.start() while True: if not process.is_alive(): print("正在重启") process.terminate() time.sleep(60) os.system("adb forward --remove-all") process = multiprocessing.Process(target=run) process.start() time.sleep(60)