import datetime import json import os import random import re import sys import time import uuid import requests sys.path.append(os.getcwd()) from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium import webdriver from common.scheduling_db import MysqlHelper from common.mq import MQ from common.common import Common from common.public import get_config_from_mysql from common import AliyunLogger from datetime import datetime class GongzhonghaoUrlAuthor: platform = "公众号" @classmethod def get_all_videos(cls, log_type, crawler, task_dict, rule_dict, user_list, env): total_s = 8 * 60 * 60 # 每个爬虫每天抓取的时间是12h(8h等待+4h抓取) wait_average_time = int((total_s / len(user_list))) for user_dict in user_list: a = user_dict["nick_name"] Common.logger(log_type, crawler).info(f'抓取公众号:{user_dict["nick_name"]}\n') Common.logging(log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}\n') AliyunLogger.logging( code="1003", platform=crawler, mode=log_type, env=env, message="开始抓取公众号: {}".format(user_dict["nick_name"]), ) try: cls.get_videoList( log_type=log_type, crawler=crawler, task_dict=task_dict, rule_dict=rule_dict, user_dict=user_dict, env=env, ) sleep_time = random.randint( wait_average_time - 120, wait_average_time - 60 ) Common.logger(log_type, crawler).info("休眠 {} 秒\n".format(sleep_time)) Common.logging(log_type, crawler, env, "休眠 {} 秒\n".format(sleep_time)) time.sleep(sleep_time) except Exception as e: Common.logger(log_type, crawler).info( f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n' ) Common.logging( log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n' ) AliyunLogger.logging( code="3000", platform=crawler, mode=log_type, env=env, message="抓取公众号: {} 时异常".format(user_dict["nick_name"]), ) AliyunLogger.logging( code="1004", platform=crawler, mode=log_type, env=env, message="完成抓取公众号: {}".format(user_dict["nick_name"]), ) @classmethod def get_video_url(cls, article_url, env): # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("headless") chrome_options.add_argument( f"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36" ) chrome_options.add_argument("--no-sandbox") # driver初始化 if env == "prod": driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) else: driver = webdriver.Chrome( desired_capabilities=ca, options=chrome_options, service=Service( "/Users/tzld/Downloads/chromedriver_mac64/chromedriver" ), ) driver.implicitly_wait(10) driver.get(article_url) time.sleep(1) if ( len( driver.find_elements( By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]' ) ) != 0 ): video_url = driver.find_element( By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]' ).get_attribute("src") elif ( len( driver.find_elements( By.XPATH, '//span[@class="js_tx_video_container"]/*[1]' ) ) != 0 ): iframe = driver.find_element( By.XPATH, '//span[@class="js_tx_video_container"]/*[1]' ).get_attribute("src") video_id = iframe.split("vid=")[-1].split("&")[0] video_url = cls.get_tencent_video_url(video_id) else: video_url = 0 driver.quit() if "mpvideo.qpic.cn" in str(video_url): time.sleep(random.randint(1, 3)) return video_url @classmethod def get_wechat_gh(cls,content_link): payload = {} headers = { 'authority': 'mp.weixin.qq.com', 'cookie': 'RK=kLuB01bYUa; ptcz=604f91ae284ed19ddcddda0c052312f03f096ccaa23994b0dc7aac856159a1d9; iip=0; rewardsn=; wxtokenkey=777; pac_uid=1_364544322; pgv_info=ssid=s5424148811; pgv_pvid=8423646400; o_cookie=364544322; wwapp.vid=; wwapp.cst=; wwapp.deviceid=; login_type=wxqrcode; tvfe_boss_uuid=97fe85e41c02f816; ua_id=FS5Q0DLf7QjeurnpAAAAAG3yjawqm2QVreYybCeE-bE=; wxuin=84408959445830; mm_lang=zh_CN; sig=h01d8310f2cf065f1baf641dec377a7cf209b3acd87e6f47e759a8eff53a83be44365d97fd1e013a7d4; uuid=626f86245f04d876538319d2b0ad00a8; xid=69967389815bcec44c878b4ec5f7d0cd; _clck=3891672333|1|fbt|0; qm_authimgs_id=1; qm_verifyimagesession=h014171884333ea321544e60514b652e6ea94b41703bea3db998ba552a68c3bb029407c558b3c658287; qqhxqqcomrouteLine=index; eas_sid=11q6j8S5i49225n2H271a3i9W6; ariaDefaultTheme=undefined; rewardsn=; wxtokenkey=777', 'referer': 'https://weixin.sogou.com/link?url=dn9a_-gY295K0Rci_xozVXfdMkSQTLW6cwJThYulHEtVjXrGTiVgS-jLzX0QJsZc9LKGBXLDqu6hcy8W9YK4n1qXa8Fplpd9kqb1on3XUORxrmoftjAEj_GbEcfeUOWbw4CoyV3mfI6CnS5wEgRgloC4xjPDiE6GeHrvBBz3sVJJqopuR3-XqA0a-_G6lnkfM41cvBft-VHFr1bNo2EnzytenNSxFGs7t5_16x7SsuyAXBbT1gj0mwfbwdmomkYm6Wv3FtUFWt3zAjcIGepUqA..&type=2&query=%E7%83%AD%E7%82%B9&token=BE1F165D212650B2999C655336D9D740998E8E7A6475BD69&k=43&h=9', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36' } try: response = requests.request("GET", content_link, headers=headers, data=payload) wechat_gh = re.search(r'var user_name = "(.*?)"', response.text).group(1) return wechat_gh except Exception: return None @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}" ; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 获取文章列表 @classmethod def get_videoList(cls, log_type, crawler, task_dict, rule_dict, user_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) wechat_gh = cls.get_wechat_gh(user_dict["nick_name"]) if None == wechat_gh: Common.logging( log_type, crawler, env, f"获取用主页id为空{task_dict}\n", ) AliyunLogger.logging( code="2004", platform=crawler, mode=log_type, env=env, message=f"获取用主页id为空{task_dict}", ) return time.sleep(1) url = "http://61.48.133.26:30001/GetGh_Doc" payload = json.dumps({ "appid": wechat_gh, "decode": "1" }) headers = { 'Content-Type': 'application/json' } r = requests.request("POST", url, headers=headers, data=payload) if "list" not in r.json(): Common.logger(log_type, crawler).warning( f"status_code:{r.status_code}, get_videoList:{r.text}\n" ) Common.logging( log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n", ) AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"status_code:{r.status_code}, get_videoList:{r.text}\n", ) time.sleep(60 * 15) return if len(r.json()["list"]) == 0: Common.logger(log_type, crawler).info("没有更多视频了\n") Common.logging(log_type, crawler, env, "没有更多视频了\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message="没有更多视频了\n", ) return else: user_name = r.json().get("gh_name") app_msg_list = r.json()["list"] for article in app_msg_list: try: trace_id = crawler + str(uuid.uuid1()) publish_time_str = article.get("published_time", 0) date_format = "%Y-%m-%d %H:%M:%S" date_time_obj = datetime.strptime(publish_time_str, date_format) publish_time_stamp = int(date_time_obj.timestamp()) article_url = article.get("url", "") video_dict = { "video_id": article.get("aid", ""), "video_title": article.get("title", "") .replace(" ", "") .replace('"', "") .replace("'", ""), "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "user_name": user_name, "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "user_id": user_dict["uid"], "avatar_url": user_dict["avatar_url"], "cover_url": article.get("head_pic", ""), "article_url": article.get("head_pic", ""), "video_url": cls.get_video_url(article_url, env), "session": f"gongzhonghao-author1-{int(time.time())}", } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging( log_type, crawler, env, f"video_dict:{video_dict}" ) AliyunLogger.logging( code="1001", trace_id=trace_id, platform=crawler, mode=log_type, env=env, message="扫描到一条视频", data=video_dict, ) if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get("period", {}).get("max", 1000)) ): Common.logger(log_type, crawler).info( f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n" ) Common.logging( log_type, crawler, env, f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n", ) AliyunLogger.logging( code="2004", trace_id=trace_id, platform=crawler, mode=log_type, env=env, data=video_dict, message="发布时间超过{}天".format( int(rule_dict.get("period", {}).get("max", 1000)) ), ) return if ( video_dict["article_url"] == 0 or video_dict["video_url"] == 0 ): Common.logger(log_type, crawler).info("文章涉嫌违反相关法律法规和政策\n") Common.logging(log_type, crawler, env, "文章涉嫌违反相关法律法规和政策\n") AliyunLogger.logging( code="2005", trace_id=trace_id, platform=crawler, mode=log_type, env=env, data=video_dict, message="无效文章或视频", ) # 标题敏感词过滤 elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=log_type, source=crawler, env=env, text="filter", action="", ) ) is True ): Common.logger(log_type, crawler).info("标题已中过滤词\n") Common.logging(log_type, crawler, env, "标题已中过滤词\n") AliyunLogger.logging( code="2003", trace_id=trace_id, platform=crawler, mode=log_type, env=env, data=video_dict, message="标题已中过滤词\n", ) # 已下载判断 elif ( cls.repeat_video( log_type, crawler, video_dict["video_id"], video_dict["video_title"], env, ) != 0 ): Common.logger(log_type, crawler).info("视频已下载\n") Common.logging(log_type, crawler, env, "视频已下载\n") AliyunLogger.logging( code="2002", trace_id=trace_id, platform=crawler, mode=log_type, env=env, data=video_dict, message="视频已下载", ) else: video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = 0 video_dict["height"] = 0 video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = user_dict[ "uid" ] # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中) video_dict["publish_time"] = video_dict["publish_time_str"] mq.send_msg(video_dict) AliyunLogger.logging( code="1002", trace_id=trace_id, platform=crawler, mode=log_type, env=env, data=video_dict, message="成功发送 MQ 至 ETL", ) time.sleep(random.randint(1, 8)) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") AliyunLogger.logging( code="3000", platform=crawler, mode=log_type, env=env, message=f"抓取单条视频异常:{e}\n", ) Common.logger(log_type, crawler).info("休眠 60 秒\n") Common.logging(log_type, crawler, env, "休眠 60 秒\n") time.sleep(60) if __name__ == '__main__': log_type = "author" crawler = "gongzhonghao" env = "dev" task_dict = {'createTime': 1688382816512, 'id': 54, 'interval': 200, 'machine': 'aliyun', 'mode': 'author', 'operator': '王坤', 'rule': {'period': {'min': 1, 'max': 1}, 'duration': {'min': 20, 'max': 2700}}, 'source': 'gongzhonghao', 'spiderName': 'run_gzh_author', 'startTime': 1688456874000, 'status': 0, 'taskName': '公众号账号', 'updateTime': 1688456876643} rule_dict = {"period": {"min": 1, "max": 1}, "duration": {"min": 20, "max": 2700}} task_id = 54 select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}""" user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="") GongzhonghaoUrlAuthor.get_all_videos(log_type=log_type, crawler=crawler, task_dict=task_dict, rule_dict=rule_dict, user_list=user_list, env=env)