import time import traceback from datetime import datetime, timedelta from app.infra.crawler.wechat import get_gzh_stat_daily from app.infra.crawler.wechat import get_access_token from app.infra.shared import run_tasks_with_asyncio_task_group class ArticleDetailStatConst: # Task Status INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAILED_STATUS = 99 # Account Status ACCOUNT_VALID_STATUS = 1 ACCOUNT_INVALID_STATUS = 0 # Cookie Status COOKIE_VALID_STATUS = 1 COOKIE_INVALID_STATUS = 0 # Gap Time GAP_DURATION = 300 # 小数点保留位数 DECIMAL_PRECISION = 7 class ArticleDetailStatMapper(ArticleDetailStatConst): def __init__(self, pool, log_service): self.pool = pool self.log_service = log_service # 获取账号信息 async def fetch_monitor_accounts(self): query = """ SELECT gh_id, account_name, app_id, app_secret FROM gzh_account_info WHERE status = %s; """ return await self.pool.async_fetch( query=query, params=(self.ACCOUNT_VALID_STATUS,) ) # 更新 access_token async def set_access_token_for_each_account( self, gh_id, access_token, expire_timestamp ): query = """ UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s, expire_timestamp = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id), ) # 从数据库获取 access_token async def get_access_token_from_database(self, gh_id): query = """ SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s; """ return await self.pool.async_fetch(query=query, params=(gh_id,)) # 从official_articles_v2 获取文章详情 async def fetch_article_detail(self, gh_id, msg_id): query = """ SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5 FROM official_articles_v2 WHERE ghId = %s AND appMsgId = %s; """ return await self.pool.async_fetch( query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id) ) # 存储文章的各个表现 async def save_article_performance(self, params: tuple): query = """ INSERT INTO article_daily_performance ( account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date, read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate, read_avg_activetime, zaikan_user, like_user, comment_count, collection_user ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON DUPLICATE KEY UPDATE read_user = VALUES(read_user), share_user = VALUES(share_user), read_subscribe_user = VALUES(read_subscribe_user), read_delivery_rate = VALUES(read_delivery_rate), read_finish_rate = VALUES(read_finish_rate), read_avg_activetime = VALUES(read_avg_activetime), zaikan_user = VALUES(zaikan_user), like_user = VALUES(like_user), comment_count = VALUES(comment_count), collection_user = VALUES(collection_user), updated_at = CURRENT_TIMESTAMP; """ return await self.pool.async_save(query=query, params=params) # 存储文章的阅读分布信息 async def save_article_read_distribution(self, params: tuple): query = """ INSERT INTO article_read_distribution ( wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate, src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user, src_search_user, src_other_user, src_total_user ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE jump_pos_1_rate = VALUES(jump_pos_1_rate), jump_pos_2_rate = VALUES(jump_pos_2_rate), jump_pos_3_rate = VALUES(jump_pos_3_rate), jump_pos_4_rate = VALUES(jump_pos_4_rate), jump_pos_5_rate = VALUES(jump_pos_5_rate), src_mp_message_user = VALUES(src_mp_message_user), src_mp_home_user = VALUES(src_mp_home_user), src_chat_user = VALUES(src_chat_user), src_moments_user = VALUES(src_moments_user), src_recommend_user = VALUES(src_recommend_user), src_search_user = VALUES(src_search_user), src_other_user = VALUES(src_other_user), src_total_user = VALUES(src_total_user), updated_at = CURRENT_TIMESTAMP; """ return await self.pool.async_save(query=query, params=params) class ArticleDetailStat(ArticleDetailStatMapper): def __init__(self, pool, log_service): super().__init__(pool, log_service) # 存储账号信息 async def save_account_details(self, account, fetch_response): article_list = fetch_response.get("list") if not article_list: return 0 is_delay = fetch_response["is_delay"] if is_delay: print("数据延迟,重新处理") return 0 async def _process_article_detail(_article_list): _msg_id_set = set() for _article in _article_list: _msg_id = _article["msgid"].split("_")[0] _msg_id_set.add(_msg_id) _article_mapper = {} for _msg_id in _msg_id_set: _publish_articles = await self.fetch_article_detail( gh_id=account["gh_id"], msg_id=_msg_id ) for _publish_article in _publish_articles: _key = f"{_msg_id}_{_publish_article['ItemIndex']}" _value = { "wx_sn": _publish_article["wx_sn"].decode("utf-8"), "title": _publish_article["title"], "content_url": _publish_article["ContentUrl"], } _article_mapper[_key] = _value return _article_mapper account_published_articles = await _process_article_detail(article_list) async def _update_single_article(_article): # 文章基本信息 app_msg_id = _article["msgid"] msg_id = app_msg_id.split("_")[0] position = app_msg_id.split("_")[1] wx_sn = account_published_articles.get(app_msg_id)["wx_sn"] # 文章表现 article_performance = _article["detail_list"][0] await self.save_article_performance( params=( account["account_name"], account["gh_id"], wx_sn, app_msg_id, msg_id, position, account_published_articles.get(app_msg_id)["title"], account_published_articles.get(app_msg_id)["content_url"], article_performance["stat_date"], article_performance["read_user"], article_performance["share_user"], article_performance["read_subscribe_user"], round( article_performance["read_delivery_rate"], self.DECIMAL_PRECISION, ), round( article_performance["read_finish_rate"], self.DECIMAL_PRECISION ), round( article_performance["read_avg_activetime"], self.DECIMAL_PRECISION, ), article_performance["zaikan_user"], article_performance["like_user"], article_performance["comment_count"], article_performance["collection_user"], ) ) # 表现分布信息 jump_rate_map = { i["position"]: round(i["rate"], self.DECIMAL_PRECISION) for i in article_performance["read_jump_position"] } source_map = { i["scene_desc"]: i["user_count"] for i in article_performance["read_user_source"] } await self.save_article_read_distribution( params=( wx_sn, jump_rate_map[1], jump_rate_map[2], jump_rate_map[3], jump_rate_map[4], jump_rate_map[5], source_map["公众号消息"], source_map["公众号主页"], source_map["聊天会话"], source_map["朋友圈"], source_map["推荐"], source_map["搜一搜"], source_map["其他"], source_map["全部"], ) ) return await run_tasks_with_asyncio_task_group( task_list=article_list, handler=_update_single_article, description="批量更新文章阅读信息", unit="article", ) # 处理单个账号 async def process_single_account(self, account: dict): gh_id = account["gh_id"] token_info = await self.get_access_token_from_database(gh_id) if not token_info: return # 更新 token async def update_token(_new_token_info): _access_token = _new_token_info["access_token"] _expires_in = _new_token_info["expires_in"] await self.set_access_token_for_each_account( gh_id=account["gh_id"], access_token=_access_token, expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION, ) print(f"{account['account_name']} access_token updated to database") expire_timestamp = token_info[0]["expire_timestamp"] or 0 if int(time.time()) >= expire_timestamp: print(f"{account['account_name']} access_token expired") new_token_info = await get_access_token( account["app_id"], account["app_secret"] ) access_token = new_token_info["access_token"] await update_token(_new_token_info=new_token_info) else: access_token = token_info[0]["access_token"] # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d") dt_list = [ (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(1, 2) ] for dt in dt_list: print(f"{account['account_name']} crawl {dt} read_data") fetch_response = await get_gzh_stat_daily( access_token=access_token, date_string=dt ) # 请求失败的情况 if fetch_response.get("errcode") == 40001: fetch_new_token_info = await get_access_token( account["app_id"], account["app_secret"] ) await update_token(_new_token_info=fetch_new_token_info) break # 处理并且落表 await self.save_account_details(account, fetch_response) # 入口函数 async def deal(self): accounts = await self.fetch_monitor_accounts() if not accounts: return for account in accounts: try: await self.process_single_account(account) await self.log_service.log( contents={ "task": "article_detail_stat", "account_name": account["account_name"], "status": "success", } ) except Exception as e: await self.log_service.log( contents={ "task": "article_detail_stat", "account_name": account["account_name"], "error": str(e), "traceback": traceback.format_exc(), "status": "fail", } )