""" @author: Curry Luo @file: gongzhonghao.py @time: 2024/01/05 """ import os import re import sys import html import json import time import uuid import random import requests import datetime sys.path.append(os.getcwd()) from application.items import VideoItem from application.pipeline import PiaoQuanPipeline from application.common.messageQueue import MQ from application.common.proxies import tunnel_proxies from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper from application.common.feishu import Feishu from application.functions.read_mysql_config import get_config_from_mysql def get_video_url(article_url): """ :param article_url: :return: """ # 替换为目标网页的 URL response = requests.get(article_url) html_text = response.text # 正则表达式提取 w = re.search( r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M ).group(1) url = html.unescape( re.sub( r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w ) ) return url class OfficialAccountAuthor(object): """ 公众号账号爬虫, """ def __init__(self, platform, mode, user_list, rule_dict, env="prod"): self.platform = platform self.mode = mode self.user_list = user_list self.rule_dict = rule_dict self.env = env self.mysql = MysqlHelper(mode=self.mode, platform=self) self.aliyun_log = AliyunLogger(self.platform, self.mode) self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) def get_video_list(self, account_name): """ 获取视频列表 :return: todo: 修改一下获取 token 的逻辑,增加 token 的可用性 """ # 获取 token and cookie fake_id = self.fake_id_manage(account_name) begin = 0 while True: token_dict = self.get_token(1) url = "https://mp.weixin.qq.com/cgi-bin/appmsg" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=" + str(token_dict["token"]) + "&lang=zh_CN", "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", "cookie": token_dict["cookie"], } params = { "action": "list_ex", "begin": str(begin), "count": "5", "fakeid": fake_id, "type": "9", "query": "", "token": str(token_dict["token"]), "lang": "zh_CN", "f": "json", "ajax": "1", } response = requests.get(url=url, params=params, headers=headers) if response.status_code == 200: result = response.json() if result["base_resp"]["err_msg"] in [ "invalid session", "freq control", ]: self.aliyun_log.logging( code="2000", message=f"status_code:{response.status_code}, get_fakeid:{response.text}\n", ) if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot( self.mode, self.platform, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/", ) time.sleep(60 * 15) continue if result["base_resp"]["err_msg"] == "ok" and len(result["list"]) == 0: print("No more data") if len(result["app_msg_list"]) == 0: self.aliyun_log.logging( code="2000", message="没有更多视频了\n", ) return else: begin += 5 app_msg_list = result["app_msg_list"] for article in app_msg_list: try: self.process_video(article, account_name, fake_id) except Exception as e: self.aliyun_log.logging( code="3000", message="代码报错, 报错信息是{}".format(e), data=article, account=account_name, ) def process_video(self, article, account_name, fake_id): """ 处理视频信息 :param fake_id: 公众号唯一 id :param account_name: 公众号的名称 :param article: 微信公众号的链接 :return: None """ trace_id = self.platform + str(uuid.uuid1()) create_time = article.get("create_time", 0) update_time = article.get("update_time", 0) publish_time_stamp = int(create_time) update_time_stamp = int(update_time) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) article_url = article.get("link", "") video_dict = { "video_id": article.get("aid", ""), "video_title": article.get("title", "") .replace(" ", "") .replace('"', "") .replace("'", ""), "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "user_name": account_name, "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "user_id": fake_id, "avatar_url": "", "cover_url": article.get("cover", ""), "article_url": article.get("link", ""), "session": f"gongzhonghao-author1-{int(time.time())}", } self.aliyun_log.logging( code="1001", message="扫描到一条视频", data=article, account=account_name ) if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ) and ( int(time.time()) - update_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): self.aliyun_log.logging( code="2004", trace_id=trace_id, data=video_dict, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), account=account_name, ) return # 标题敏感词过滤 elif ( any( str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql( log_type=self.mode, source=self.platform, env=self.env, text="filter", ) ) is True ): self.aliyun_log.logging( code="2003", trace_id=trace_id, data=video_dict, account=account_name, message="标题已中过滤词\n", ) # 已下载判断 elif ( self.repeat_video( video_dict["video_id"], ) != 0 ): self.aliyun_log.logging( code="2002", trace_id=trace_id, data=video_dict, account=account_name, message="视频已下载", ) else: video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = self.platform video_dict["strategy"] = self.mode video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = 0 video_dict["height"] = 0 video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = fake_id # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中) video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["video_url"] = get_video_url(article_url) self.mq.send_msg(video_dict) self.aliyun_log.logging( code="1002", trace_id=trace_id, data=video_dict, account=account_name, message="成功发送 MQ 至 ETL", ) time.sleep(random.randint(1, 8)) def repeat_video(self, video_id): """ :param video_id: video_id :return: """ sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{video_id}" ; """ repeat_video = self.mysql.select(sql) return len(repeat_video) def fake_id_manage(self, account_name): """ 根据公众号的名字去查询 fake_id, 若 fake_id 存在,则返回,若不存在则插入 account_name: 公众号的名字,user_dict['link'] 获取fake_id :return: """ select_sql = f"""select name, name_id from accounts where name = "{account_name}" and platform = "{self.platform}" and useful = 1 limit 1""" account_info = self.mysql.select(sql=select_sql) if account_info: name, name_id = account_info[0] return name_id else: user_info = self.get_user_fake_id(account_name) if user_info: fake_id = user_info["user_id"] insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{account_name}", "{fake_id}", "{self.platform}", 1 )""" self.mysql.update(sql=insert_sql) return fake_id def get_token(self, token_index): """ 获取 公众号的 token :param token_index: :return: """ select_sql = f""" select * from crawler_config where source="{self.platform}" and title LIKE "%公众号_{token_index}%";""" configs = self.mysql.select(select_sql) if len(configs) == 0: Feishu.bot(self.mode, self.platform, f"公众号_{token_index}:未配置token") time.sleep(60) return None token_dict = { "token_id": configs[0]["id"], "title": configs[0]["title"].strip(), "token": dict(eval(configs[0]["config"]))["token"].strip(), "cookie": dict(eval(configs[0]["config"]))["cookie"].strip(), "update_time": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(configs[0]["update_time"] / 1000)), ), "operator": configs[0]["operator"].strip(), } return token_dict