""" @author: luojunhui @desc: 更新文章的阅读详情 """ import json import time import traceback import urllib.parse from datetime import datetime from typing import Dict, List from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.api import FeishuBotApi from applications.db import DatabaseConnector from applications.utils import str_to_md5, show_desc_to_sta from cold_start.crawler.wechat import get_article_list_from_account from config import denet_config, long_articles_config, piaoquan_crawler_config class UpdatePublishedArticlesTaskConst: """ 更新已发布文章消息常量配置 """ SUCCESS_CODE = 0 # 爬虫详情接口返回code ARTICLE_ILLEGAL_CODE = 25012 ARTICLE_DELETE_CODE = 25005 ARTICLE_UNKNOWN_CODE = 10000 # 账号违规状态码 ACCOUNT_ILLEGAL_CODE = 25013 UNKNOWN_SPIDER_ERROR_CODE = 20000 # 记录默认状态 DEFAULT_STATUS = 0 # 请求接口失败状态 REQUEST_FAIL_STATUS = -1 # 文章被删除状态 DELETE_STATUS = -2 # 未知原因无信息返回状态 UNKNOWN_STATUS = -3 # 文章违规状态 ILLEGAL_STATUS = -4 # 公众号类型(订阅号 or 服务号) # 订阅号 SUBSCRIBE_TYPE_SET = {0, 1} # 服务号 SERVICE_TYPE = 2 # 监测周期(秒) MONITOR_PERIOD = 60 * 60 * 24 * 3 # 新号抓文章周期 NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30 # 订阅号,抓取失败失败率报警阈值 TASK_FAIL_RATE_THRESHOLD = 0.3 # ARTICLE TABLE ARTICLE_TABLE_NAME = "official_articles_v2" class UpdatePublishedArticlesTaskBase(UpdatePublishedArticlesTaskConst): def __init__(self): self.crawler_client = DatabaseConnector(piaoquan_crawler_config) self.long_articles_client = DatabaseConnector(long_articles_config) self.denet_client = DatabaseConnector(denet_config) self.crawler_client.connect() self.long_articles_client.connect() self.denet_client.connect() self.feishu_bot_api = FeishuBotApi() def fetch_published_accounts(self) -> List[Dict]: """ get published articles from aigc """ fetch_query = f""" SELECT DISTINCT t3.`name` as account_name, t3.gh_id, t3.follower_count as fans, t3.create_timestamp as account_init_timestamp, t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id, group_concat(distinct t5.remark) as account_remark FROM publish_plan t1 JOIN publish_plan_account t2 ON t1.id = t2.plan_id JOIN publish_account t3 ON t2.account_id = t3.id LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id LEFT JOIN publish_account_remark t5 on t3.id = t5.publish_account_id WHERE t1.plan_status = 1 AND t3.channel = 5 GROUP BY t3.id; """ fetch_response = self.denet_client.fetch( query=fetch_query, cursor_type=DictCursor ) account_list = [ i for i in fetch_response if "自动回复" not in str(i["account_remark"]) ] return account_list def fetch_account_experiment_status(self) -> Dict[str, str]: fetch_query = f""" SELECT t1.account_id, t2.status FROM wx_statistics_group_source_account t1 JOIN wx_statistics_group_source t2 ON t1.group_source_name = t2.account_source_name; """ account_status_list = self.denet_client.fetch( query=fetch_query, cursor_type=DictCursor ) account_status_dict = { account["account_id"]: account["status"] for account in account_status_list } return account_status_dict def get_account_list(self) -> List[Dict]: account_without_status = self.fetch_published_accounts() account_status_dict = self.fetch_account_experiment_status() account_list = [ { **item, "using_status": ( 0 if account_status_dict.get(item["account_id"]) == "实验" else 1 ), } for item in account_without_status ] return account_list def fetch_account_max_publish_timestamp(self, gh_id: str) -> int: # get max published timestamp for this account fetch_query = f""" SELECT MAX(publish_timestamp) AS max_publish_timestamp FROM {self.ARTICLE_TABLE_NAME} WHERE ghId = %s; """ fetch_response = self.crawler_client.fetch( fetch_query, cursor_type=DictCursor, params=(gh_id,) ) if fetch_response: max_publish_timestamp = fetch_response[0]["max_publish_timestamp"] else: max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD return max_publish_timestamp def crawl_account_published_articles(self, account: Dict[str, str]): # max_publish_timestamp = self.fetch_account_max_publish_timestamp( # account["gh_id"] # ) cursor = None while True: crawl_response = get_article_list_from_account( account["gh_id"], index=cursor ) crawl_response_code = crawl_response["code"] match crawl_response_code: # 请求成功 case self.SUCCESS_CODE: print("success") break # msg_list = crawl_response.get("data", {}).get("data", []) # if not msg_list: # break # # self.record_each_msg(account, msg_list) # earliest_msg = msg_list[-1] # earliest_update_timestamp = earliest_msg["AppMsg"]["BaseInfo"][ # "UpdateTime" # ] # if earliest_update_timestamp > max_publish_timestamp: # cursor = crawl_response["data"]["next_cursor"] # else: # break # 账号违规 case self.ACCOUNT_ILLEGAL_CODE: log( task="update_published_articles", function="crawl_account_published_articles", message="账号违规", data=account, ) self.feishu_bot_api.bot( title="公众号账号违规报警", detail={ "账号名称": account["account_name"], "账号ID": account["gh_id"], "违规原因": crawl_response["msg"], "粉丝数": account["fans"], "利用状态": account["using_status"] }, env="gzh_monitor_bot", mention=False ) break case self.UNKNOWN_SPIDER_ERROR_CODE: log( task="update_published_articles", function="crawl_account_published_articles", message=f"未知错误", data=account, ) self.feishu_bot_api.bot( title="接口请求失败报警", detail={ "账号名称": account["account_name"], "账号ID": account["gh_id"], "违规原因": crawl_response["msg"], "粉丝数": account["fans"], "利用状态": account["using_status"] }, env="dev", mention=False ) break # 其他 code case _: print("unknown code:", crawl_response_code) break def record_each_msg(self, account, msg_list): for msg in msg_list: base_info = msg.get("BaseInfo", {}) app_msg_id = msg.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None) create_timestamp = ( msg.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None) ) update_timestamp = ( msg.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None) ) publish_type = msg.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None) detail_article_list = msg.get("AppMsg", {}).get("DetailInfo", []) if detail_article_list: for article in detail_article_list: title = article.get("Title", None) digest = article.get("Digest", None) item_index = article.get("ItemIndex", None) content_url = article.get("ContentUrl", None) source_url = article.get("SourceUrl", None) cover_img_url = article.get("CoverImgUrl", None) cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None) cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None) item_show_type = article.get("ItemShowType", None) is_original = article.get("IsOriginal", None) show_desc = article.get("ShowDesc", None) show_stat = show_desc_to_sta(show_desc) ori_content = article.get("ori_content", None) show_view_count = show_stat.get("show_view_count", 0) show_like_count = show_stat.get("show_like_count", 0) show_zs_count = show_stat.get("show_zs_count", 0) show_pay_count = show_stat.get("show_pay_count", 0) wx_sn = ( content_url.split("&sn=")[1].split("&")[0] if content_url else None ) status = account["using_status"] info_tuple = ( account["gh_id"], account["account_name"], app_msg_id, title, publish_type, create_timestamp, update_timestamp, digest, item_index, content_url, source_url, cover_img_url, cover_img_url_1_1, cover_img_url_235_1, item_show_type, is_original, show_desc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, json.dumps(base_info, ensure_ascii=False), str_to_md5(title), status, ) try: insert_sql = f""" INSERT INTO {self.ARTICLE_TABLE_NAME} (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.crawler_client.save(query=insert_sql, params=info_tuple) log( task="updatePublishedMsgDaily", function="insert_each_msg", message="插入文章数据成功", data={"info": info_tuple}, ) except Exception as e: try: update_sql = f""" UPDATE {self.ARTICLE_TABLE_NAME} SET show_view_count = %s, show_like_count=%s WHERE wx_sn = %s; """ self.crawler_client.save( query=update_sql, params=(show_view_count, show_like_count, wx_sn), ) log( task="updatePublishedMsgDaily", function="insert_each_msg", message="更新文章数据成功", data={ "wxSn": wx_sn, "likeCount": show_like_count, "viewCount": show_view_count, }, ) except Exception as e: log( task="updatePublishedMsgDaily", function="insert_each_msg", message="更新文章失败, 报错原因是: {}".format(e), status="fail", ) continue class UpdatePublishedArticlesTaskCollector(UpdatePublishedArticlesTaskBase): def deal(self): account_list = self.get_account_list() for account in tqdm(account_list, desc="抓取每个账号的文章信息"): try: self.crawl_account_published_articles(account) except Exception as e: log( task="update_published_articles_collector", function="crawl_account_published_articles", message=f"抓取账号文章信息失败, 报错原因是: {e}", status="fail", data={ "account": account, "error": traceback.format_exc(), } ) # self.feishu_bot_api.bot( # title='更新每日发布文章任务完成通知', # detail={ # "msg": "账号更新完成", # "finish_time": datetime.today().__str__() # }, # mention=False # ) class UpdatePublishedArticlesTaskChecker(UpdatePublishedArticlesTaskBase): def deal(self): pass class UpdatePublishedArticlesTaskArticlesMonitor(UpdatePublishedArticlesTaskBase): pass