# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/7/10 import base64 import datetime import json import os import random import string import subprocess import sys import time import requests import urllib3 import re from requests.adapters import HTTPAdapter from selenium import webdriver from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.mq import MQ from common.feishu import Feishu from common.public import download_rule, get_config_from_mysql from common.common import Common from common.scheduling_db import MysqlHelper from common.userAgent import get_random_user_agent class XiguaRecommend: platform = "xigua" @classmethod def random_signature(cls): src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample( src_lowercase, lowercase_num) random.shuffle(password) new_password = 'AAAAAAAAAA' + ''.join(password)[10:-4] + 'AAAB' new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == '8': new_password = new_password_start + 'w' + new_password_end elif new_password[18] == '9': new_password = new_password_start + 'x' + new_password_end elif new_password[18] == '-': new_password = new_password_start + 'y' + new_password_end elif new_password[18] == '.': new_password = new_password_start + 'z' + new_password_end else: new_password = new_password_start + 'y' + new_password_end return new_password @classmethod def get_video_url(cls, video_info): video_url_dict = {} video_resource = video_info.get('videoResource', {}) dash_120fps = video_resource.get('dash_120fps', {}) normal = video_resource.get('normal', {}) # 从dash_120fps和normal字典中获取video_list字典 video_list = dash_120fps.get('video_list', {}) or normal.get('video_list', {}) # 获取video_list字典中的video_4、video_3、video_2或video_1的值。如果找到非空视频URL,则将其赋值给变量video_url。否则,将赋值为空字符串。 video = video_list.get('video_4') or video_list.get('video_3') or video_list.get('video_2') or video_list.get('video_1') video_url = video.get('backup_url_1', '') if video else '' audio_url = video.get('backup_url_1', '') if video else '' video_width = video.get('vwidth', 0) if video else 0 video_height = video.get('vheight', 0) if video else 0 video_url = re.sub(r'[^a-zA-Z0-9+/=]', '', video_url) # 从视频URL中删除特殊字符 audio_url = re.sub(r'[^a-zA-Z0-9+/=]', '', audio_url) # 从音频URL中删除特殊字符 video_url = base64.b64decode(video_url).decode('utf8') # 解码视频URL audio_url = base64.b64decode(audio_url).decode('utf8') # 解码音频URL video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height return video_url_dict @classmethod def get_comment_cnt(cls, item_id): url = "https://www.ixigua.com/tlb/comment/article/v5/tab_comments/?" params = { "tab_index": "0", "count": "10", "offset": "10", "group_id": str(item_id), "item_id": str(item_id), "aid": "1768", "msToken": "50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==", "X-Bogus": "DFSzswVOyGtANVeWtCLMqR/F6q9U", "_signature": cls.random_signature(), } headers = { 'authority': 'www.ixigua.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'cache-control': 'no-cache', 'cookie': 'MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; SEARCH_CARD_MODE=7168304743566296612_0; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=7Pux7s634-z8DYvCM20y7KigwH5u7Rh6D9C-RROpnT.aGMEcz6Vsxp.oai47wJqa4f86; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1683858689%7Ca5223fe1500578e01e138a0d71d6444692018296c4c24f5885af174a65873c95; ixigua-a-s=3; msToken=50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==; __ac_nonce=0645dcbf0005064517440; __ac_signature=_02B4Z6wo00f01FEGmAwAAIDBKchzCGqn-MBRJpyAAHAjieFC5GEg6gGiwz.I4PRrJl7f0GcixFrExKmgt6QI1i1S-dQyofPEj2ugWTCnmKUdJQv-wYuDofeKNe8VtMtZq2aKewyUGeKU-5Ud21; ixigua-a-s=3', 'pragma': 'no-cache', 'referer': f'https://www.ixigua.com/{item_id}?logTag=3c5aa86a8600b9ab8540', 'sec-ch-ua': '"Microsoft Edge";v="113", "Chromium";v="113", "Not-A.Brand";v="24"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'tt-anti-token': 'cBITBHvmYjEygzv-f9c78c1297722cf1f559c74b084e4525ce4900bdcf9e8588f20cc7c2e3234422', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35', 'x-secsdk-csrf-token': '000100000001f8e733cf37f0cd255a51aea9a81ff7bc0c09490cfe41ad827c3c5c18ec809279175e4d9f5553d8a5' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, verify=False, proxies=Common.tunnel_proxies(), timeout=5) response.close() if response.status_code != 200 or 'total_number' not in response.json() or response.json() == {}: return 0 return response.json().get("total_number", 0) # 获取视频详情 @classmethod def get_video_info(cls, log_type, crawler, item_id): url = 'https://www.ixigua.com/api/mixVideo/information?' headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "user-agent": get_random_user_agent('pc'), "referer": "https://www.ixigua.com/7102614741050196520?logTag=0531c88ac04f38ab2c62", } params = { 'mixId': str(item_id), 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfC' 'NVVIOBNjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'X-Bogus': 'DFSzswVupYTANCJOSBk0P53WxM-r', '_signature': '_02B4Z6wo0000119LvEwAAIDCuktNZ0y5wkdfS7jAALThuOR8D9yWNZ.EmWHKV0WSn6Px' 'fPsH9-BldyxVje0f49ryXgmn7Tzk-swEHNb15TiGqa6YF.cX0jW8Eds1TtJOIZyfc9s5emH7gdWN94', } cookies = { 'ixigua-a-s': '1', 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfCNVVIOB' 'NjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'ttwid': '1%7C_yXQeHWwLZgCsgHClOwTCdYSOt_MjdOkgnPIkpi-Sr8%7C1661241238%7Cf57d0c5ef3f1d7' '6e049fccdca1ac54887c34d1f8731c8e51a49780ff0ceab9f8', 'tt_scid': 'QZ4l8KXDG0YAEaMCSbADdcybdKbUfG4BC6S4OBv9lpRS5VyqYLX2bIR8CTeZeGHR9ee3', 'MONITOR_WEB_ID': '0a49204a-7af5-4e96-95f0-f4bafb7450ad', '__ac_nonce': '06304878000964fdad287', '__ac_signature': '_02B4Z6wo00f017Rcr3AAAIDCUVxeW1tOKEu0fKvAAI4cvoYzV-wBhq7B6D8k0no7lb' 'FlvYoinmtK6UXjRIYPXnahUlFTvmWVtb77jsMkKAXzAEsLE56m36RlvL7ky.M3Xn52r9t1IEb7IR3ke8', 'ttcid': 'e56fabf6e85d4adf9e4d91902496a0e882', '_tea_utm_cache_1300': 'undefined', 'support_avif': 'false', 'support_webp': 'false', 'xiguavideopcwebid': '7134967546256016900', 'xiguavideopcwebid.sig': 'xxRww5R1VEMJN_dQepHorEu_eAc', } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, cookies=cookies, verify=False, proxies=Common.tunnel_proxies(), timeout=5) response.close() if response.status_code != 200 or 'data' not in response.json() or response.json()['data'] == {}: Common.logger(log_type, crawler).warning(f"get_video_info:{response.status_code}, {response.text}\n") return None else: video_info = response.json()['data'].get("gidInformation", {}).get("packerData", {}).get("video", {}) if video_info == {}: return None video_dict = { "video_title": video_info.get("title", ""), "video_id": video_info.get("videoResource", {}).get("vid", ""), "gid": str(item_id), "play_cnt": int(video_info.get("video_watch_count", 0)), "like_cnt": int(video_info.get("video_like_count", 0)), "comment_cnt": int(cls.get_comment_cnt(item_id)), "share_cnt": 0, "favorite_cnt": 0, "duration": int(video_info.get("video_duration", 0)), "video_width": int(cls.get_video_url(video_info)["video_width"]), "video_height": int(cls.get_video_url(video_info)["video_height"]), "publish_time_stamp": int(video_info.get("video_publish_time", 0)), "publish_time_str": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(video_info.get("video_publish_time", 0)))), "user_name": video_info.get("user_info", {}).get("name", ""), "user_id": str(video_info.get("user_info", {}).get("user_id", "")), "avatar_url": str(video_info.get("user_info", {}).get("avatar_url", "")), "cover_url": video_info.get("poster_url", ""), "audio_url": cls.get_video_url(video_info)["audio_url"], "video_url": cls.get_video_url(video_info)["video_url"], "session": f"xigua-search-{int(time.time())}" } return video_dict @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def quit(cls, log_type, crawler, env, driver: WebDriver): Common.logger(log_type, crawler).info("退出浏览器") Common.logging(log_type, crawler, env, "退出浏览器") driver.quit() quit_cmd = "ps aux | grep Chrome | grep -v grep | awk '{print $2}' | xargs kill -9" os.system(quit_cmd) @classmethod def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) Common.logger(log_type, crawler).info("启动 Chrome 浏览器") Common.logging(log_type, crawler, env, "启动 Chrome 浏览器") # kill 所有 Chrome 进程 quit_cmd = "ps aux | grep Chrome | grep -v grep | awk '{print $2}' | xargs kill -9" os.system(quit_cmd) time.sleep(1) # 启动 Chrome,指定端口号:12306 cmd = 'open -a "Google Chrome" --args --remote-debugging-port=12306' os.system(cmd) # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 配置 chromedriver if env == "dev": chromedriver = "/Users/wangkun/Downloads/chromedriver/chromedriver_v114/chromedriver" else: # chromedriver = "/usr/bin/chromedriver" chromedriver = "/Users/kanyikan/Downloads/chromedriver/chromedriver_v114/chromedriver" # # 设置IP代理 # proxy = Proxy() # proxy.proxy_type = ProxyType.MANUAL # proxy.http_proxy = Common.tunnel_proxies()["http"] # 代理的IP地址和端口号 # 初始化浏览器 browser = webdriver.ChromeOptions() # browser.add_argument(f'--proxy-server={Common.tunnel_proxies()}') # 代理的IP地址和端口号 browser.add_experimental_option("debuggerAddress", "127.0.0.1:12306") # driver初始化 driver = webdriver.Chrome(desired_capabilities=ca, options=browser, service=Service(chromedriver)) driver.implicitly_wait(10) Common.logger(log_type, crawler).info("打开西瓜推荐页") Common.logging(log_type, crawler, env, "打开西瓜推荐页") driver.get(f"https://www.ixigua.com/") time.sleep(2) # 检查登录状态 if len(driver.find_elements(By.XPATH, '//*[@class="BU-Component-Header-Avatar__image"]')) == 0: Common.logger(log_type, crawler).info("登录失效") Common.logging(log_type, crawler, env, "登录失效") driver.get_screenshot_as_file(f"./{crawler}/photos/logon_err.png") # # 登录失效,报警 # if 20 >= datetime.datetime.now().hour >= 10: # Feishu.bot(log_type, crawler, "西瓜推荐,登录失效") return videoList_elements = driver.find_elements(By.XPATH, '//*[@class="HorizontalFeedCard HorizontalChannelBlockList__item"]') if len(videoList_elements) == 0: Common.logger(log_type, crawler).info("到底啦~~~~~~~~~~\n") Common.logging(log_type, crawler, env, "到底啦~~~~~~~~~~\n") cls.quit(log_type, crawler, env, driver) return for i, video_element in enumerate(videoList_elements): Common.logger(log_type, crawler).info(f"正在抓取第{i+1}条视频") Common.logging(log_type, crawler, env, f"正在抓取第{i+1}条视频") item_id = video_element.find_elements(By.XPATH, '//*[@class="HorizontalFeedCard__coverWrapper disableZoomAnimation"]')[i].get_attribute("href") item_id = item_id.replace("https://www.ixigua.com/", "").replace("?&", "") Common.logger(log_type, crawler).info(f"item_id:{item_id}") video_dict = cls.get_video_info(log_type, crawler, item_id) if video_dict is None: Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") continue for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"{video_dict}") if download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') Common.logging(log_type, crawler, env, '已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') Common.logging(log_type, crawler, env, '视频已下载\n') else: # Common.logger(log_type, crawler).info("满足下载规则\n") video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = our_uid video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["strategy_type"] = log_type mq.send_msg(video_dict) cls.quit(log_type, crawler, env, driver) if __name__ == "__main__": # XiguaRecommend.get_videoList("recommend", "xigua", "dev") print(subprocess.run(['crontab', '-l'])) print(subprocess.run(['crontab', '-e'])) pass