# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/28 import datetime import json import os import shutil import sys import time from hashlib import md5 import requests import urllib3 from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium import webdriver sys.path.append(os.getcwd()) from common.mq import MQ from common.common import Common from common.feishu import Feishu from common.publish import Publish from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql, download_rule, title_like, task_unbind class GongzhonghaoAuthor: platform = "公众号" # 获取 token @classmethod def get_token(cls, log_type, crawler, token_index, env): select_sql = f""" select * from crawler_config where source="{crawler}" and title LIKE "%公众号_{token_index}%";""" configs = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="") if len(configs) == 0: Feishu.bot(log_type, crawler, f"公众号_{token_index}:未配置token") time.sleep(60) return None token_dict = { "token_id": configs[0]["id"], "title": configs[0]["title"].strip(), "token": dict(eval(configs[0]["config"]))["token"].strip(), "cookie": dict(eval(configs[0]["config"]))["cookie"].strip(), "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(configs[0]["update_time"]/1000))), "operator": configs[0]["operator"].strip() } # for k, v in token_dict.items(): # print(f"{k}:{type(v)}, {v}") return token_dict # 获取用户 fakeid @classmethod def get_user_info(cls, log_type, crawler, task_dict, user_dict, token_index, env): Common.logger(log_type, crawler).info(f"获取站外用户信息:{user_dict['link']}") Common.logging(log_type, crawler, env, f"获取站外用户信息:{user_dict['link']}") while True: token_dict = cls.get_token(log_type, crawler, token_index, env) url = "https://mp.weixin.qq.com/cgi-bin/searchbiz?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=1011071554&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': token_dict['cookie'], } params = { "action": "search_biz", "begin": "0", "count": "5", "query": str(user_dict['link']), "token": token_dict['token'], "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) r.close() if r.json()["base_resp"]["err_msg"] == "invalid session": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_fakeid:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_fakeid:{r.text}\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 15) continue if r.json()["base_resp"]["err_msg"] == "freq control": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_fakeid:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_fakeid:{r.text}\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 15) continue if r.json()["base_resp"]["err_msg"] == "ok" and len(r.json()["list"]) == 0: Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_fakeid:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_fakeid:{r.text}\n") unbind_msg = task_unbind(log_type=log_type, crawler=crawler, taskid=task_dict['id'], uids=str(user_dict["uid"]), env=env) if unbind_msg == "success": if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n") Common.logging(log_type, crawler, env, f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n") else: Common.logger(log_type, crawler).warning(f"unbind_msg:{unbind_msg}") Common.logging(log_type, crawler, env, f"unbind_msg:{unbind_msg}") return None user_info_dict = {'user_name': r.json()["list"][0]["nickname"], 'user_id': r.json()["list"][0]["fakeid"], 'avatar_url': r.json()["list"][0]["round_head_img"]} return user_info_dict # 获取腾讯视频下载链接 @classmethod def get_tencent_video_url(cls, video_id): url = 'https://vv.video.qq.com/getinfo?vids=' + str(video_id) + '&platform=101001&charge=0&otype=json' response = requests.get(url=url).text.replace('QZOutputJson=', '').replace('"};', '"}') response = json.loads(response) url = response['vl']['vi'][0]['ul']['ui'][0]['url'] fvkey = response['vl']['vi'][0]['fvkey'] video_url = url + str(video_id) + '.mp4?vkey=' + fvkey return video_url @classmethod def get_video_url(cls, article_url, env): # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("headless") chrome_options.add_argument( f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') chrome_options.add_argument("--no-sandbox") # driver初始化 if env == "prod": driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) else: driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service( '/Users/wangkun/Downloads/chromedriver/chromedriver_v113/chromedriver')) driver.implicitly_wait(10) driver.get(article_url) time.sleep(1) if len(driver.find_elements(By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]')) != 0: video_url = driver.find_element( By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]').get_attribute('src') elif len(driver.find_elements(By.XPATH, '//span[@class="js_tx_video_container"]/*[1]')) != 0: iframe = driver.find_element(By.XPATH, '//span[@class="js_tx_video_container"]/*[1]').get_attribute( 'src') video_id = iframe.split('vid=')[-1].split('&')[0] video_url = cls.get_tencent_video_url(video_id) else: video_url = 0 driver.quit() return video_url # 获取文章列表 @classmethod def get_videoList(cls, log_type, crawler, task_dict, token_index, rule_dict, user_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) user_info_dict = cls.get_user_info(log_type=log_type, crawler=crawler, task_dict=task_dict, user_dict=user_dict, token_index=token_index, env=env) if user_info_dict is None: return user_dict["user_id"] = user_info_dict["user_id"] user_dict["user_name"] = user_info_dict["user_name"] user_dict["avatar_url"] = user_info_dict["avatar_url"] begin = 0 while True: token_dict = cls.get_token(log_type, crawler, token_index, env) url = "https://mp.weixin.qq.com/cgi-bin/appmsg?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=" + str(token_dict['token']) + "&lang=zh_CN", 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", 'cookie': token_dict['cookie'], } params = { "action": "list_ex", "begin": str(begin), "count": "5", "fakeid": user_dict['user_id'], "type": "9", "query": "", "token": str(token_dict['token']), "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) r.close() if r.json()["base_resp"]["err_msg"] == "invalid session": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']}\n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 15) continue if r.json()["base_resp"]["err_msg"] == "freq control": Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 15) continue if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002: Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n") task_unbind(log_type=log_type, crawler=crawler, taskid=task_dict['id'], uids=str(user_dict["uid"]), env=env) if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler,f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n") return if 'app_msg_list' not in r.json(): Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n") if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']}\n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/") time.sleep(60 * 15) continue if len(r.json()['app_msg_list']) == 0: Common.logger(log_type, crawler).info('没有更多视频了\n') Common.logging(log_type, crawler, env, '没有更多视频了\n') return else: begin += 5 app_msg_list = r.json()['app_msg_list'] for article in app_msg_list: try: create_time = article.get('create_time', 0) publish_time_stamp = int(create_time) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) article_url = article.get('link', '') video_dict = { 'video_id': article.get('aid', ''), 'video_title': article.get("title", "").replace(' ', '').replace('"', '').replace("'", ""), 'publish_time_stamp': publish_time_stamp, 'publish_time_str': publish_time_str, 'user_name': user_dict["user_name"], 'play_cnt': 0, 'comment_cnt': 0, 'like_cnt': 0, 'share_cnt': 0, 'user_id': user_dict['user_id'], 'avatar_url': user_dict['avatar_url'], 'cover_url': article.get('cover', ''), 'article_url': article.get('link', ''), 'video_url': cls.get_video_url(article_url, env), 'session': f'gongzhonghao-author1-{int(time.time())}' } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f'video_dict:{video_dict}') if int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('max', 1000)): Common.logger(log_type, crawler).info(f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") Common.logging(log_type, crawler, env, f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") return if video_dict['article_url'] == 0 or video_dict['video_url'] == 0: Common.logger(log_type, crawler).info("文章涉嫌违反相关法律法规和政策\n") Common.logging(log_type, crawler, env, "文章涉嫌违反相关法律法规和政策\n") # 标题敏感词过滤 elif any(str(word) if str(word) in video_dict['video_title'] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info("标题已中过滤词\n") Common.logging(log_type, crawler, env, "标题已中过滤词\n") # 已下载判断 elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info("视频已下载\n") Common.logging(log_type, crawler, env, "视频已下载\n") # 标题相似度 elif title_like(log_type, crawler, video_dict['video_title'], cls.platform, env) is True: Common.logger(log_type, crawler).info(f'标题相似度>=80%:{video_dict["video_title"]}\n') Common.logging(log_type, crawler, env, f'标题相似度>=80%:{video_dict["video_title"]}\n') else: # cls.download_publish(log_type=log_type, # crawler=crawler, # video_dict=video_dict, # rule_dict=rule_dict, # # user_dict=user_dict, # env=env) video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = 0 video_dict["height"] = 0 video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = user_dict["uid"] # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中) video_dict["publish_time"] = video_dict["publish_time_str"] mq.send_msg(video_dict) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") Common.logger(log_type, crawler).info('休眠 60 秒\n') Common.logging(log_type, crawler, env, '休眠 60 秒\n') time.sleep(60) @classmethod def repeat_video(cls, log_type, crawler, video_id, env): # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """ sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, video_dict, rule_dict, env): # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() try: if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") Common.logging(log_type, crawler, env, "视频size=0,删除成功\n") return except FileNotFoundError: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频文件不存在,删除文件夹成功\n") Common.logging(log_type, crawler, env, "视频文件不存在,删除文件夹成功\n") return # 获取视频时长 ffmpeg_dict = Common.ffmpeg(log_type, crawler, f"./{crawler}/videos/{video_dict['video_title']}/video.mp4") video_dict["video_width"] = ffmpeg_dict["width"] video_dict["video_height"] = ffmpeg_dict["height"] video_dict["duration"] = ffmpeg_dict["duration"] Common.logger(log_type, crawler).info(f'video_width:{video_dict["video_width"]}') Common.logging(log_type, crawler, env, f'video_width:{video_dict["video_width"]}') Common.logger(log_type, crawler).info(f'video_height:{video_dict["video_height"]}') Common.logging(log_type, crawler, env, f'video_height:{video_dict["video_height"]}') Common.logger(log_type, crawler).info(f'duration:{video_dict["duration"]}') Common.logging(log_type, crawler, env, f'duration:{video_dict["duration"]}') if download_rule(log_type, crawler, video_dict, rule_dict) is False: shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("不满足抓取规则,删除成功\n") Common.logging(log_type, crawler, env, "不满足抓取规则,删除成功\n") return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 保存视频信息至 "./videos/{video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") Common.logging(log_type, crawler, env, "开始上传视频...") strategy = "定向爬虫策略" if env == 'prod': oss_endpoint = "inner" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid="follow", oss_endpoint=oss_endpoint, env=env) our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{str(our_video_id)}/info" else: oss_endpoint = "out" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid="follow", oss_endpoint=oss_endpoint, env=env) our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{str(our_video_id)}/info" if our_video_id is None: try: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") return except FileNotFoundError: return insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['user_id']}", "{cls.platform}", "定向爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") Common.logging(log_type, crawler, env, f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息写入数据库成功') Common.logging(log_type, crawler, env, '视频信息写入数据库成功') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "47e39d", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", video_dict['video_title'], video_dict['video_id'], our_video_link, int(video_dict['duration']), f"{video_dict['video_width']}*{video_dict['video_height']}", video_dict['publish_time_str'], video_dict['user_name'], video_dict['user_id'], video_dict['avatar_url'], video_dict['cover_url'], video_dict['article_url'], video_dict['video_url']]] time.sleep(0.5) Feishu.update_values(log_type, crawler, "47e39d", "F2:Z2", values) Common.logger(log_type, crawler).info('视频下载/上传成功\n') Common.logging(log_type, crawler, env, '视频下载/上传成功\n') @classmethod def get_all_videos(cls, log_type, crawler, task_dict, token_index, rule_dict, user_list, env): for user_dict in user_list: Common.logger(log_type, crawler).info(f'抓取公众号:{user_dict["nick_name"]}\n') Common.logging(log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}\n') try: cls.get_videoList(log_type=log_type, crawler=crawler, task_dict=task_dict, token_index = token_index, rule_dict=rule_dict, user_dict=user_dict, env=env) Common.logger(log_type, crawler).info('休眠 60 秒\n') Common.logging(log_type, crawler, env, '休眠 60 秒\n') time.sleep(60) except Exception as e: Common.logger(log_type, crawler).info(f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n') Common.logging(log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n') if __name__ == "__main__": # GongzhonghaoAuthor1.get_token("author", "gongzhonghao", "prod") # print(GongzhonghaoAuthor1.get_users("author", "gongzhonghao", "Bzv72P", "dev")) # print(get_config_from_mysql("author", "gongzhonghao", "dev", "filter", action="")) # print(title_like("author", "gongzhonghao", "公众号", "123", "dev")) # print(GongzhonghaoAuthor1.get_user_info("author", "gongzhonghao", "幸福花朵", "dev")) pass