import json import time from tqdm import tqdm from pymysql.cursors import DictCursor from applications.api import FeishuBotApi from applications.db import DatabaseConnector from cold_start.crawler.wechat import get_article_detail from cold_start.crawler.wechat import get_article_list_from_account from config import long_articles_config, denet_config class OutsideGzhArticlesManager: def __init__(self): self.long_articles_client = DatabaseConnector(long_articles_config) self.long_articles_client.connect() self.denet_client = DatabaseConnector(denet_config) self.denet_client.connect() self.feishu_bot_api = FeishuBotApi() def update_article_illegal_status(self, article_id, illegal_reason): update_query = f""" update outside_gzh_account_monitor set illegal_status = %s, illegal_reason = %s where id = %s and illegal_reason = %s """ self.long_articles_client.save( query=update_query, params=(1, illegal_reason, article_id, 0) ) class OutsideGzhArticlesCollector(OutsideGzhArticlesManager): def fetch_outside_account_list(self): fetch_query = f""" select t2.group_source_name as account_source, t3.name as account_name, t3.gh_id as gh_id, t3.status as status from wx_statistics_group_source t1 join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name join publish_account t3 on t3.id = t2.account_id where t1.mode_type = '代运营服务号'; """ return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor) def fetch_each_account(self, account: dict): gh_id = account["gh_id"] fetch_response = get_article_list_from_account(gh_id) msg_list = fetch_response.get("data", {}).get("data", []) if msg_list: for msg in tqdm(msg_list, desc=f"insert account {account['account_name']}"): self.save_each_msg_to_db(msg, account) else: print(f"crawler failed: {account['account_name']}") def save_each_msg_to_db(self, msg: dict, account: dict): base_info = msg["AppMsg"]["BaseInfo"] detail_info = msg["AppMsg"]["DetailInfo"] app_msg_id = base_info["AppMsgId"] create_timestamp = base_info["CreateTime"] publish_type = base_info["Type"] # insert each article for article in detail_info: link = article["ContentUrl"] article_detail = get_article_detail(link) response_code = article_detail["code"] if response_code == 25012: illegal_reason = article_detail.get("msg") # bot and return self.feishu_bot_api.bot( title="文章违规告警", detail={ "account_name": article["account_name"], "title": article['title'], "reason": illegal_reason, "publish_timestamp": create_timestamp, "account_source": article["account_source"] }, env="dev" ) elif response_code == 0: insert_query = f""" insert ignore into outside_gzh_account_monitor (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link, channel_content_id, crawler_timestamp, publish_timestamp) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.long_articles_client.save( query=insert_query, params=( account["account_name"], account["gh_id"], account["account_source"], "服务号", app_msg_id, publish_type, article["ItemIndex"], article["Title"], link, article_detail["data"]["data"]["channel_content_id"], int(time.time()), int(article_detail["data"]["data"]["publish_timestamp"] / 1000), ), ) else: continue def deal(self): account_list = self.fetch_outside_account_list() for account in tqdm(account_list[:10]): self.fetch_each_account(account) class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager): def fetch_article_list_to_check(self): publish_timestamp_threshold = int(time.time()) - 7 * 24 * 3600 fetch_query = f""" select id, account_name, gh_id, account_source, account_type, title, link, from_unixtime(publish_timestamp) as publish_date from outside_gzh_account_monitor where illegal_status = 0 and publish_timestamp > {publish_timestamp_threshold}; """ return self.long_articles_client.fetch( query=fetch_query, cursor_type=DictCursor ) def check_each_article(self, article: dict): """ check each article """ link = article["link"] article_detail = get_article_detail(link) response_code = article_detail["code"] if response_code == 25012: illegal_reason = article_detail.get("msg") self.feishu_bot_api.bot( title="文章违规告警", detail={ "account_name": article["account_name"], "title": article['title'], "reason": illegal_reason, "publish_date": article["publish_date"], "account_source": article["account_source"] }, env="dev" ) article_id = article["id"] self.update_article_illegal_status(article_id, illegal_reason) else: return def deal(self): article_list = self.fetch_article_list_to_check() for article in tqdm(article_list): self.check_each_article(article) if __name__ == "__main__": # collector = OutsideGzhArticlesCollector() # collector.deal() monitor = OutsideGzhArticlesMonitor() monitor.deal()