123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import time
- import datetime
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications.api import FeishuBotApi
- from applications.db import DatabaseConnector
- from cold_start.crawler.wechat import get_article_detail
- from cold_start.crawler.wechat import get_article_list_from_account
- from config import long_articles_config, denet_config
- class Const:
- # 文章违规状态
- ILLEGAL_STATUS = 1
- INIT_STATUS = 0
- # 监测周期
- MONITOR_CYCLE = 5 * 60 * 60 * 24
- # Article Code
- ILLEGAL_CODE = 25012
- SUCCESS_CODE = 0
- class OutsideGzhArticlesManager(Const):
- def __init__(self):
- self.long_articles_client = DatabaseConnector(long_articles_config)
- self.long_articles_client.connect()
- self.denet_client = DatabaseConnector(denet_config)
- self.denet_client.connect()
- self.feishu_bot_api = FeishuBotApi()
- def update_article_illegal_status(
- self, article_id: int, illegal_reason: str
- ) -> None:
- update_query = f"""
- update outside_gzh_account_monitor
- set illegal_status = %s, illegal_reason = %s
- where id = %s and illegal_status = %s
- """
- self.long_articles_client.save(
- query=update_query,
- params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
- )
- def whether_published_in_a_week(self, gh_id: str) -> bool:
- """
- 判断该账号一周内是否有发文,如有,则说无需抓
- """
- fetch_query = f"""
- select id, publish_timestamp from outside_gzh_account_monitor
- where gh_id = '{gh_id}'
- order by publish_timestamp desc
- limit 1;
- """
- fetch_response = self.long_articles_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- if fetch_response:
- publish_timestamp = fetch_response[0]["publish_timestamp"]
- if publish_timestamp is None:
- return False
- else:
- return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
- else:
- return False
- class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
- def fetch_outside_account_list(self):
- fetch_query = f"""
- select
- t2.group_source_name as account_source,
- t3.name as account_name,
- t3.gh_id as gh_id,
- t3.status as status
- from wx_statistics_group_source t1
- join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
- join publish_account t3 on t3.id = t2.account_id
- where
- t1.mode_type = '代运营服务号';
- """
- return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
- def fetch_each_account(self, account: dict):
- gh_id = account["gh_id"]
- # 判断该账号本周是否已经发布过
- if self.whether_published_in_a_week(gh_id):
- return
- fetch_response = get_article_list_from_account(gh_id)
- try:
- msg_list = fetch_response.get("data", {}).get("data", [])
- if msg_list:
- for msg in tqdm(
- msg_list, desc=f"insert account {account['account_name']}"
- ):
- self.save_each_msg_to_db(msg, account)
- else:
- print(f"crawler failed: {account['account_name']}")
- except Exception as e:
- print(
- f"crawler failed: account_name: {account['account_name']}\n"
- f"error: {e}\n"
- )
- def save_each_msg_to_db(self, msg: dict, account: dict):
- base_info = msg["AppMsg"]["BaseInfo"]
- detail_info = msg["AppMsg"]["DetailInfo"]
- app_msg_id = base_info["AppMsgId"]
- create_timestamp = base_info["CreateTime"]
- publish_type = base_info["Type"]
- # insert each article
- for article in detail_info:
- link = article["ContentUrl"]
- article_detail = get_article_detail(link)
- response_code = article_detail["code"]
- if response_code == self.ILLEGAL_CODE:
- illegal_reason = article_detail.get("msg")
- # bot and return
- self.feishu_bot_api.bot(
- title="文章违规告警",
- detail={
- "账号名称": article["account_name"],
- "标题": article["title"],
- "违规理由": illegal_reason,
- "发布日期": datetime.datetime.fromtimestamp(create_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
- "账号合作商": article["account_source"],
- },
- env="outside_gzh_monitor",
- mention=False
- )
- elif response_code == self.SUCCESS_CODE:
- insert_query = f"""
- insert ignore into outside_gzh_account_monitor
- (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link,
- channel_content_id, crawler_timestamp, publish_timestamp)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- self.long_articles_client.save(
- query=insert_query,
- params=(
- account["account_name"],
- account["gh_id"],
- account["account_source"],
- "服务号",
- app_msg_id,
- publish_type,
- article["ItemIndex"],
- article["Title"],
- link,
- article_detail["data"]["data"]["channel_content_id"],
- int(time.time()),
- int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
- ),
- )
- else:
- continue
- def deal(self):
- account_list = self.fetch_outside_account_list()
- for account in tqdm(account_list):
- try:
- self.fetch_each_account(account)
- except Exception as e:
- print(f"crawler failed: {account['account_name']}, error: {e}")
- class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
- def fetch_article_list_to_check(self):
- publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
- fetch_query = f"""
- select id, account_name, gh_id, account_source, account_type,
- title, link, from_unixtime(publish_timestamp) as publish_date
- from outside_gzh_account_monitor
- where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
- """
- return self.long_articles_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- def check_each_article(self, article: dict):
- """
- check each article
- """
- link = article["link"]
- article_detail = get_article_detail(link)
- response_code = article_detail["code"]
- if response_code == self.ILLEGAL_CODE:
- illegal_reason = article_detail.get("msg")
- # illegal_reason = '测试报警功能'
- self.feishu_bot_api.bot(
- title="文章违规告警",
- detail={
- "账号名称": article["account_name"],
- "标题": article["title"],
- "违规理由": illegal_reason,
- "发布日期": str(article["publish_date"]),
- "账号合作商": article["account_source"],
- },
- env="outside_gzh_monitor",
- mention=False
- )
- article_id = article["id"]
- self.update_article_illegal_status(article_id, illegal_reason)
- else:
- return
- def deal(self):
- article_list = self.fetch_article_list_to_check()
- for article in tqdm(article_list):
- try:
- self.check_each_article(article)
- except Exception as e:
- print(
- f"crawler failed: account_name: {article['account_name']}\n"
- f"link: {article['link']}\n"
- f"title: {article['title']}\n"
- f"error: {e}\n"
- )
|