| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 | """@author: Curry Luo@file: gongzhonghao.py@time: 2024/01/05"""import osimport reimport sysimport htmlimport jsonimport timeimport uuidimport randomimport requestsimport datetimesys.path.append(os.getcwd())from application.items import VideoItemfrom application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common.proxies import tunnel_proxiesfrom application.common.log import AliyunLoggerfrom application.common.mysql import MysqlHelperfrom application.common.feishu import Feishufrom application.functions.read_mysql_config import get_config_from_mysqldef get_video_url(article_url):    """    :param article_url:    :return:    """    # 替换为目标网页的 URL    response = requests.get(article_url)    html_text = response.text    # 正则表达式提取    w = re.search(        r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M    ).group(1)    url = html.unescape(        re.sub(            r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w        )    )    return urlclass OfficialAccountAuthor(object):    """    公众号账号爬虫,    """    def __init__(self, platform, mode, user_list, rule_dict, env="prod"):        self.platform = platform        self.mode = mode        self.user_list = user_list        self.rule_dict = rule_dict        self.env = env        self.mysql = MysqlHelper(mode=self.mode, platform=self)        self.aliyun_log = AliyunLogger(self.platform, self.mode)        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)    def get_video_list(self, account_name):        """        获取视频列表        :return:        todo: 修改一下获取 token 的逻辑,增加 token 的可用性        """        # 获取 token and cookie        fake_id = self.fake_id_manage(account_name)        begin = 0        while True:            token_dict = self.get_token(1)            url = "https://mp.weixin.qq.com/cgi-bin/appmsg"            headers = {                "accept": "*/*",                "accept-encoding": "gzip, deflate, br",                "accept-language": "zh-CN,zh;q=0.9",                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?"                "t=media/appmsg_edit_v2&action=edit&isNew=1"                "&type=77&createType=5&token="                + str(token_dict["token"])                + "&lang=zh_CN",                "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',                "sec-ch-ua-mobile": "?0",                "sec-ch-ua-platform": '"Windows"',                "sec-fetch-dest": "empty",                "sec-fetch-mode": "cors",                "sec-fetch-site": "same-origin",                "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"                " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36",                "x-requested-with": "XMLHttpRequest",                "cookie": token_dict["cookie"],            }            params = {                "action": "list_ex",                "begin": str(begin),                "count": "5",                "fakeid": fake_id,                "type": "9",                "query": "",                "token": str(token_dict["token"]),                "lang": "zh_CN",                "f": "json",                "ajax": "1",            }            response = requests.get(url=url, params=params, headers=headers)            if response.status_code == 200:                result = response.json()                if result["base_resp"]["err_msg"] in [                    "invalid session",                    "freq control",                ]:                    self.aliyun_log.logging(                        code="2000",                        message=f"status_code:{response.status_code}, get_fakeid:{response.text}\n",                    )                    if 20 >= datetime.datetime.now().hour >= 10:                        Feishu.bot(                            self.mode,                            self.platform,                            f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/",                        )                    time.sleep(60 * 15)                    continue                if result["base_resp"]["err_msg"] == "ok" and len(result["list"]) == 0:                    print("No more data")                if len(result["app_msg_list"]) == 0:                    self.aliyun_log.logging(                        code="2000",                        message="没有更多视频了\n",                    )                    return                else:                    begin += 5                    app_msg_list = result["app_msg_list"]                    for article in app_msg_list:                        try:                            self.process_video(article, account_name, fake_id)                        except Exception as e:                            self.aliyun_log.logging(                                code="3000",                                message="代码报错, 报错信息是{}".format(e),                                data=article,                                account=account_name,                            )    def process_video(self, article, account_name, fake_id):        """        处理视频信息        :param fake_id: 公众号唯一 id        :param account_name: 公众号的名称        :param article: 微信公众号的链接        :return: None        """        trace_id = self.platform + str(uuid.uuid1())        create_time = article.get("create_time", 0)        update_time = article.get("update_time", 0)        publish_time_stamp = int(create_time)        update_time_stamp = int(update_time)        publish_time_str = time.strftime(            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)        )        article_url = article.get("link", "")        video_dict = {            "video_id": article.get("aid", ""),            "video_title": article.get("title", "")            .replace(" ", "")            .replace('"', "")            .replace("'", ""),            "publish_time_stamp": publish_time_stamp,            "publish_time_str": publish_time_str,            "user_name": account_name,            "play_cnt": 0,            "comment_cnt": 0,            "like_cnt": 0,            "share_cnt": 0,            "user_id": fake_id,            "avatar_url": "",            "cover_url": article.get("cover", ""),            "article_url": article.get("link", ""),            "session": f"gongzhonghao-author1-{int(time.time())}",        }        self.aliyun_log.logging(            code="1001", message="扫描到一条视频", data=article, account=account_name        )        if (            int(time.time()) - publish_time_stamp            > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))        ) and (            int(time.time()) - update_time_stamp            > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))        ):            self.aliyun_log.logging(                code="2004",                trace_id=trace_id,                data=video_dict,                message="发布时间超过{}天".format(                    int(self.rule_dict.get("period", {}).get("max", 1000))                ),                account=account_name,            )            return        # 标题敏感词过滤        elif (            any(                str(word) if str(word) in video_dict["video_title"] else False                for word in get_config_from_mysql(                    log_type=self.mode,                    source=self.platform,                    env=self.env,                    text="filter",                )            )            is True        ):            self.aliyun_log.logging(                code="2003",                trace_id=trace_id,                data=video_dict,                account=account_name,                message="标题已中过滤词\n",            )        # 已下载判断        elif (            self.repeat_video(                video_dict["video_id"],            )            != 0        ):            self.aliyun_log.logging(                code="2002",                trace_id=trace_id,                data=video_dict,                account=account_name,                message="视频已下载",            )        else:            video_dict["out_user_id"] = video_dict["user_id"]            video_dict["platform"] = self.platform            video_dict["strategy"] = self.mode            video_dict["out_video_id"] = video_dict["video_id"]            video_dict["width"] = 0            video_dict["height"] = 0            video_dict["crawler_rule"] = json.dumps(self.rule_dict)            video_dict["user_id"] = fake_id  # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中)            video_dict["publish_time"] = video_dict["publish_time_str"]            video_dict["video_url"] = get_video_url(article_url)            self.mq.send_msg(video_dict)            self.aliyun_log.logging(                code="1002",                trace_id=trace_id,                data=video_dict,                account=account_name,                message="成功发送 MQ 至 ETL",            )            time.sleep(random.randint(1, 8))    def repeat_video(self, video_id):        """        :param video_id: video_id        :return:        """        sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{video_id}" ; """        repeat_video = self.mysql.select(sql)        return len(repeat_video)    def fake_id_manage(self, account_name):        """        根据公众号的名字去查询 fake_id, 若 fake_id 存在,则返回,若不存在则插入        account_name: 公众号的名字,user_dict['link']        获取fake_id        :return:        """        select_sql = f"""select name, name_id from accounts where name = "{account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""        account_info = self.mysql.select(sql=select_sql)        if account_info:            name, name_id = account_info[0]            return name_id        else:            user_info = self.get_user_fake_id(account_name)            if user_info:                fake_id = user_info["user_id"]                insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{account_name}", "{fake_id}", "{self.platform}", 1 )"""                self.mysql.update(sql=insert_sql)                return fake_id    def get_token(self, token_index):        """        获取 公众号的 token        :param token_index:        :return:        """        select_sql = f""" select * from crawler_config where source="{self.platform}" and title LIKE "%公众号_{token_index}%";"""        configs = self.mysql.select(select_sql)        if len(configs) == 0:            Feishu.bot(self.mode, self.platform, f"公众号_{token_index}:未配置token")            time.sleep(60)            return None        token_dict = {            "token_id": configs[0]["id"],            "title": configs[0]["title"].strip(),            "token": dict(eval(configs[0]["config"]))["token"].strip(),            "cookie": dict(eval(configs[0]["config"]))["cookie"].strip(),            "update_time": time.strftime(                "%Y-%m-%d %H:%M:%S",                time.localtime(int(configs[0]["update_time"] / 1000)),            ),            "operator": configs[0]["operator"].strip(),        }        return token_dict
 |