123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- """
- @author: Curry Luo
- @file: gongzhonghao.py
- @time: 2024/01/05
- """
- import os
- import re
- import sys
- import html
- import json
- import time
- import uuid
- import random
- import requests
- import datetime
- sys.path.append(os.getcwd())
- from application.items import VideoItem
- from application.pipeline import PiaoQuanPipeline
- from application.common.messageQueue import MQ
- from application.common.proxies import tunnel_proxies
- from application.common.log import AliyunLogger
- from application.common.mysql import MysqlHelper
- from application.common.feishu import Feishu
- from application.functions.read_mysql_config import get_config_from_mysql
- def get_video_url(article_url):
- """
- :param article_url:
- :return:
- """
- # 替换为目标网页的 URL
- response = requests.get(article_url)
- html_text = response.text
- # 正则表达式提取
- w = re.search(
- r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
- ).group(1)
- url = html.unescape(
- re.sub(
- r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
- )
- )
- return url
- class OfficialAccountAuthor(object):
- """
- 公众号账号爬虫,
- """
- def __init__(self, platform, mode, user_list, rule_dict, env="prod"):
- self.platform = platform
- self.mode = mode
- self.user_list = user_list
- self.rule_dict = rule_dict
- self.env = env
- self.mysql = MysqlHelper(mode=self.mode, platform=self)
- self.aliyun_log = AliyunLogger(self.platform, self.mode)
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- def get_video_list(self, account_name):
- """
- 获取视频列表
- :return:
- todo: 修改一下获取 token 的逻辑,增加 token 的可用性
- """
- # 获取 token and cookie
- fake_id = self.fake_id_manage(account_name)
- begin = 0
- while True:
- token_dict = self.get_token(1)
- url = "https://mp.weixin.qq.com/cgi-bin/appmsg"
- headers = {
- "accept": "*/*",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "zh-CN,zh;q=0.9",
- "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?"
- "t=media/appmsg_edit_v2&action=edit&isNew=1"
- "&type=77&createType=5&token="
- + str(token_dict["token"])
- + "&lang=zh_CN",
- "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Windows"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-origin",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
- " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36",
- "x-requested-with": "XMLHttpRequest",
- "cookie": token_dict["cookie"],
- }
- params = {
- "action": "list_ex",
- "begin": str(begin),
- "count": "5",
- "fakeid": fake_id,
- "type": "9",
- "query": "",
- "token": str(token_dict["token"]),
- "lang": "zh_CN",
- "f": "json",
- "ajax": "1",
- }
- response = requests.get(url=url, params=params, headers=headers)
- if response.status_code == 200:
- result = response.json()
- if result["base_resp"]["err_msg"] in [
- "invalid session",
- "freq control",
- ]:
- self.aliyun_log.logging(
- code="2000",
- message=f"status_code:{response.status_code}, get_fakeid:{response.text}\n",
- )
- if 20 >= datetime.datetime.now().hour >= 10:
- Feishu.bot(
- self.mode,
- self.platform,
- f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/",
- )
- time.sleep(60 * 15)
- continue
- if result["base_resp"]["err_msg"] == "ok" and len(result["list"]) == 0:
- print("No more data")
- if len(result["app_msg_list"]) == 0:
- self.aliyun_log.logging(
- code="2000",
- message="没有更多视频了\n",
- )
- return
- else:
- begin += 5
- app_msg_list = result["app_msg_list"]
- for article in app_msg_list:
- try:
- self.process_video(article, account_name, fake_id)
- except Exception as e:
- self.aliyun_log.logging(
- code="3000",
- message="代码报错, 报错信息是{}".format(e),
- data=article,
- account=account_name,
- )
- def process_video(self, article, account_name, fake_id):
- """
- 处理视频信息
- :param fake_id: 公众号唯一 id
- :param account_name: 公众号的名称
- :param article: 微信公众号的链接
- :return: None
- """
- trace_id = self.platform + str(uuid.uuid1())
- create_time = article.get("create_time", 0)
- update_time = article.get("update_time", 0)
- publish_time_stamp = int(create_time)
- update_time_stamp = int(update_time)
- publish_time_str = time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
- )
- article_url = article.get("link", "")
- video_dict = {
- "video_id": article.get("aid", ""),
- "video_title": article.get("title", "")
- .replace(" ", "")
- .replace('"', "")
- .replace("'", ""),
- "publish_time_stamp": publish_time_stamp,
- "publish_time_str": publish_time_str,
- "user_name": account_name,
- "play_cnt": 0,
- "comment_cnt": 0,
- "like_cnt": 0,
- "share_cnt": 0,
- "user_id": fake_id,
- "avatar_url": "",
- "cover_url": article.get("cover", ""),
- "article_url": article.get("link", ""),
- "session": f"gongzhonghao-author1-{int(time.time())}",
- }
- self.aliyun_log.logging(
- code="1001", message="扫描到一条视频", data=article, account=account_name
- )
- if (
- int(time.time()) - publish_time_stamp
- > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
- ) and (
- int(time.time()) - update_time_stamp
- > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
- ):
- self.aliyun_log.logging(
- code="2004",
- trace_id=trace_id,
- data=video_dict,
- message="发布时间超过{}天".format(
- int(self.rule_dict.get("period", {}).get("max", 1000))
- ),
- account=account_name,
- )
- return
- # 标题敏感词过滤
- elif (
- any(
- str(word) if str(word) in video_dict["video_title"] else False
- for word in get_config_from_mysql(
- log_type=self.mode,
- source=self.platform,
- env=self.env,
- text="filter",
- )
- )
- is True
- ):
- self.aliyun_log.logging(
- code="2003",
- trace_id=trace_id,
- data=video_dict,
- account=account_name,
- message="标题已中过滤词\n",
- )
- # 已下载判断
- elif (
- self.repeat_video(
- video_dict["video_id"],
- )
- != 0
- ):
- self.aliyun_log.logging(
- code="2002",
- trace_id=trace_id,
- data=video_dict,
- account=account_name,
- message="视频已下载",
- )
- else:
- video_dict["out_user_id"] = video_dict["user_id"]
- video_dict["platform"] = self.platform
- video_dict["strategy"] = self.mode
- video_dict["out_video_id"] = video_dict["video_id"]
- video_dict["width"] = 0
- video_dict["height"] = 0
- video_dict["crawler_rule"] = json.dumps(self.rule_dict)
- video_dict["user_id"] = fake_id # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中)
- video_dict["publish_time"] = video_dict["publish_time_str"]
- video_dict["video_url"] = get_video_url(article_url)
- self.mq.send_msg(video_dict)
- self.aliyun_log.logging(
- code="1002",
- trace_id=trace_id,
- data=video_dict,
- account=account_name,
- message="成功发送 MQ 至 ETL",
- )
- time.sleep(random.randint(1, 8))
- def repeat_video(self, video_id):
- """
- :param video_id: video_id
- :return:
- """
- sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{video_id}" ; """
- repeat_video = self.mysql.select(sql)
- return len(repeat_video)
- def fake_id_manage(self, account_name):
- """
- 根据公众号的名字去查询 fake_id, 若 fake_id 存在,则返回,若不存在则插入
- account_name: 公众号的名字,user_dict['link']
- 获取fake_id
- :return:
- """
- select_sql = f"""select name, name_id from accounts where name = "{account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
- account_info = self.mysql.select(sql=select_sql)
- if account_info:
- name, name_id = account_info[0]
- return name_id
- else:
- user_info = self.get_user_fake_id(account_name)
- if user_info:
- fake_id = user_info["user_id"]
- insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{account_name}", "{fake_id}", "{self.platform}", 1 )"""
- self.mysql.update(sql=insert_sql)
- return fake_id
- def get_token(self, token_index):
- """
- 获取 公众号的 token
- :param token_index:
- :return:
- """
- select_sql = f""" select * from crawler_config where source="{self.platform}" and title LIKE "%公众号_{token_index}%";"""
- configs = self.mysql.select(select_sql)
- if len(configs) == 0:
- Feishu.bot(self.mode, self.platform, f"公众号_{token_index}:未配置token")
- time.sleep(60)
- return None
- token_dict = {
- "token_id": configs[0]["id"],
- "title": configs[0]["title"].strip(),
- "token": dict(eval(configs[0]["config"]))["token"].strip(),
- "cookie": dict(eval(configs[0]["config"]))["cookie"].strip(),
- "update_time": time.strftime(
- "%Y-%m-%d %H:%M:%S",
- time.localtime(int(configs[0]["update_time"] / 1000)),
- ),
- "operator": configs[0]["operator"].strip(),
- }
- return token_dict
|