| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- import time
- import traceback
- from datetime import datetime, timedelta
- from app.infra.crawler.wechat import get_gzh_stat_daily
- from app.infra.crawler.wechat import get_access_token
- from app.infra.shared import run_tasks_with_asyncio_task_group
- class ArticleDetailStatConst:
- # Task Status
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAILED_STATUS = 99
- # Account Status
- ACCOUNT_VALID_STATUS = 1
- ACCOUNT_INVALID_STATUS = 0
- # Cookie Status
- COOKIE_VALID_STATUS = 1
- COOKIE_INVALID_STATUS = 0
- # Gap Time
- GAP_DURATION = 300
- # 小数点保留位数
- DECIMAL_PRECISION = 7
- class ArticleDetailStatMapper(ArticleDetailStatConst):
- def __init__(self, pool, log_service):
- self.pool = pool
- self.log_service = log_service
- # 获取账号信息
- async def fetch_monitor_accounts(self):
- query = """
- SELECT gh_id, account_name, app_id, app_secret
- FROM gzh_account_info WHERE status = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.ACCOUNT_VALID_STATUS,)
- )
- # 更新 access_token
- async def set_access_token_for_each_account(
- self, gh_id, access_token, expire_timestamp
- ):
- query = """
- UPDATE gzh_cookie_info
- SET access_token = %s, access_token_status = %s, expire_timestamp = %s
- WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id),
- )
- # 从数据库获取 access_token
- async def get_access_token_from_database(self, gh_id):
- query = """
- SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id,))
- # 从official_articles_v2 获取文章详情
- async def fetch_article_detail(self, gh_id, msg_id):
- query = """
- SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5
- FROM official_articles_v2
- WHERE ghId = %s AND appMsgId = %s;
- """
- return await self.pool.async_fetch(
- query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id)
- )
- # 存储文章的各个表现
- async def save_article_performance(self, params: tuple):
- query = """
- INSERT INTO article_daily_performance
- (
- account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date,
- read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate,
- read_avg_activetime, zaikan_user, like_user, comment_count, collection_user
- ) VALUES (
- %s, %s, %s, %s, %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
- )
- ON DUPLICATE KEY UPDATE
- read_user = VALUES(read_user),
- share_user = VALUES(share_user),
- read_subscribe_user = VALUES(read_subscribe_user),
- read_delivery_rate = VALUES(read_delivery_rate),
- read_finish_rate = VALUES(read_finish_rate),
- read_avg_activetime = VALUES(read_avg_activetime),
- zaikan_user = VALUES(zaikan_user),
- like_user = VALUES(like_user),
- comment_count = VALUES(comment_count),
- collection_user = VALUES(collection_user),
- updated_at = CURRENT_TIMESTAMP;
- """
- return await self.pool.async_save(query=query, params=params)
- # 存储文章的阅读分布信息
- async def save_article_read_distribution(self, params: tuple):
- query = """
- INSERT INTO article_read_distribution
- (
- wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate,
- src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user,
- src_search_user, src_other_user, src_total_user
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- jump_pos_1_rate = VALUES(jump_pos_1_rate),
- jump_pos_2_rate = VALUES(jump_pos_2_rate),
- jump_pos_3_rate = VALUES(jump_pos_3_rate),
- jump_pos_4_rate = VALUES(jump_pos_4_rate),
- jump_pos_5_rate = VALUES(jump_pos_5_rate),
- src_mp_message_user = VALUES(src_mp_message_user),
- src_mp_home_user = VALUES(src_mp_home_user),
- src_chat_user = VALUES(src_chat_user),
- src_moments_user = VALUES(src_moments_user),
- src_recommend_user = VALUES(src_recommend_user),
- src_search_user = VALUES(src_search_user),
- src_other_user = VALUES(src_other_user),
- src_total_user = VALUES(src_total_user),
- updated_at = CURRENT_TIMESTAMP;
- """
- return await self.pool.async_save(query=query, params=params)
- class ArticleDetailStat(ArticleDetailStatMapper):
- def __init__(self, pool, log_service):
- super().__init__(pool, log_service)
- # 存储账号信息
- async def save_account_details(self, account, fetch_response):
- article_list = fetch_response.get("list")
- if not article_list:
- return 0
- is_delay = fetch_response["is_delay"]
- if is_delay:
- print("数据延迟,重新处理")
- return 0
- async def _process_article_detail(_article_list):
- _msg_id_set = set()
- for _article in _article_list:
- _msg_id = _article["msgid"].split("_")[0]
- _msg_id_set.add(_msg_id)
- _article_mapper = {}
- for _msg_id in _msg_id_set:
- _publish_articles = await self.fetch_article_detail(
- gh_id=account["gh_id"], msg_id=_msg_id
- )
- for _publish_article in _publish_articles:
- _key = f"{_msg_id}_{_publish_article['ItemIndex']}"
- _value = {
- "wx_sn": _publish_article["wx_sn"].decode("utf-8"),
- "title": _publish_article["title"],
- "content_url": _publish_article["ContentUrl"],
- }
- _article_mapper[_key] = _value
- return _article_mapper
- account_published_articles = await _process_article_detail(article_list)
- async def _update_single_article(_article):
- # 文章基本信息
- app_msg_id = _article["msgid"]
- msg_id = app_msg_id.split("_")[0]
- position = app_msg_id.split("_")[1]
- wx_sn = account_published_articles.get(app_msg_id)["wx_sn"]
- # 文章表现
- article_performance = _article["detail_list"][0]
- await self.save_article_performance(
- params=(
- account["account_name"],
- account["gh_id"],
- wx_sn,
- app_msg_id,
- msg_id,
- position,
- account_published_articles.get(app_msg_id)["title"],
- account_published_articles.get(app_msg_id)["content_url"],
- article_performance["stat_date"],
- article_performance["read_user"],
- article_performance["share_user"],
- article_performance["read_subscribe_user"],
- round(
- article_performance["read_delivery_rate"],
- self.DECIMAL_PRECISION,
- ),
- round(
- article_performance["read_finish_rate"], self.DECIMAL_PRECISION
- ),
- round(
- article_performance["read_avg_activetime"],
- self.DECIMAL_PRECISION,
- ),
- article_performance["zaikan_user"],
- article_performance["like_user"],
- article_performance["comment_count"],
- article_performance["collection_user"],
- )
- )
- # 表现分布信息
- jump_rate_map = {
- i["position"]: round(i["rate"], self.DECIMAL_PRECISION)
- for i in article_performance["read_jump_position"]
- }
- source_map = {
- i["scene_desc"]: i["user_count"]
- for i in article_performance["read_user_source"]
- }
- await self.save_article_read_distribution(
- params=(
- wx_sn,
- jump_rate_map[1],
- jump_rate_map[2],
- jump_rate_map[3],
- jump_rate_map[4],
- jump_rate_map[5],
- source_map["公众号消息"],
- source_map["公众号主页"],
- source_map["聊天会话"],
- source_map["朋友圈"],
- source_map["推荐"],
- source_map["搜一搜"],
- source_map["其他"],
- source_map["全部"],
- )
- )
- return await run_tasks_with_asyncio_task_group(
- task_list=article_list,
- handler=_update_single_article,
- description="批量更新文章阅读信息",
- unit="article",
- )
- # 处理单个账号
- async def process_single_account(self, account: dict):
- gh_id = account["gh_id"]
- token_info = await self.get_access_token_from_database(gh_id)
- if not token_info:
- return
- # 更新 token
- async def update_token(_new_token_info):
- _access_token = _new_token_info["access_token"]
- _expires_in = _new_token_info["expires_in"]
- await self.set_access_token_for_each_account(
- gh_id=account["gh_id"],
- access_token=_access_token,
- expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
- )
- print(f"{account['account_name']} access_token updated to database")
- expire_timestamp = token_info[0]["expire_timestamp"] or 0
- if int(time.time()) >= expire_timestamp:
- print(f"{account['account_name']} access_token expired")
- new_token_info = await get_access_token(
- account["app_id"], account["app_secret"]
- )
- access_token = new_token_info["access_token"]
- await update_token(_new_token_info=new_token_info)
- else:
- access_token = token_info[0]["access_token"]
- # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d")
- dt_list = [
- (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
- for i in range(1, 2)
- ]
- for dt in dt_list:
- print(f"{account['account_name']} crawl {dt} read_data")
- fetch_response = await get_gzh_stat_daily(
- access_token=access_token, date_string=dt
- )
- # 请求失败的情况
- if fetch_response.get("errcode") == 40001:
- fetch_new_token_info = await get_access_token(
- account["app_id"], account["app_secret"]
- )
- await update_token(_new_token_info=fetch_new_token_info)
- break
- # 处理并且落表
- await self.save_account_details(account, fetch_response)
- # 入口函数
- async def deal(self):
- accounts = await self.fetch_monitor_accounts()
- if not accounts:
- return
- for account in accounts:
- try:
- await self.process_single_account(account)
- await self.log_service.log(
- contents={
- "task": "article_detail_stat",
- "account_name": account["account_name"],
- "status": "success",
- }
- )
- except Exception as e:
- await self.log_service.log(
- contents={
- "task": "article_detail_stat",
- "account_name": account["account_name"],
- "error": str(e),
- "traceback": traceback.format_exc(),
- "status": "fail",
- }
- )
|