Procházet zdrojové kódy

Merge branch 'feature/luojunhui/20260123-daily-article-stat' of Server/LongArticleTaskServer into master

luojunhui před 1 měsícem
rodič
revize
9dfa275c39

+ 1 - 0
applications/crawler/wechat/__init__.py

@@ -1,2 +1,3 @@
 from .gzh_spider import *
 from .gzh_fans import *
+from .gzh_article_stat import *

+ 21 - 0
applications/crawler/wechat/gzh_article_stat.py

@@ -0,0 +1,21 @@
+from applications.utils import AsyncHttpClient
+
+
+# 抓取公众号粉丝
+async def get_gzh_stat_daily(access_token: str, date_string: str):
+    url = f"https://api.weixin.qq.com/datacube/getarticletotaldetail?access_token={access_token}"
+    data = {
+        "begin_date": date_string,
+        "end_date": date_string
+    }
+    headers = {
+        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
+        "Content-Type": "application/json"
+    }
+    # 发送请求
+    async with AsyncHttpClient(timeout=10) as http_client:
+        response = await http_client.post(url, headers=headers, json=data)
+
+    return response
+
+

+ 7 - 2
applications/crawler/wechat/gzh_fans.py

@@ -47,9 +47,14 @@ async def get_gzh_fans(token, cookie, cursor_id, cursor_timestamp):
 
 # 获取 access_token
 async def get_access_token(app_id, app_secret):
-    url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={app_id}&secret={app_secret}"
+    url = f"https://api.weixin.qq.com/cgi-bin/stable_token"
+    data = {
+        "grant_type": "client_credential",
+        "appid": app_id,
+        "secret": app_secret
+    }
     async with AsyncHttpClient(timeout=100) as http_client:
-        response = await http_client.get(url)
+        response = await http_client.post(url, json=data)
 
     return response
 

+ 39 - 18
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -1,5 +1,6 @@
 import asyncio
 import json
+import time
 from datetime import datetime
 
 from applications.crawler.wechat import (
@@ -24,6 +25,8 @@ class CrawlerGzhFansConst:
 
     MAX_CONCURRENCY = 5
 
+    GAP_DURATION = 300
+
 
 class CrawlerGzhFansBase(CrawlerGzhFansConst):
     def __init__(self, pool, log_client):
@@ -33,11 +36,9 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
     # 从数据库获取 access_token
     async def get_access_token_from_database(self, gh_id):
         query = """
-            SELECT access_token FROM gzh_cookie_info where gh_id = %s and access_token_status = %s;
+            SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
         """
-        return await self.pool.async_fetch(
-            query=query, params=(gh_id, self.AVAILABLE_STATUS)
-        )
+        return await self.pool.async_fetch(query=query, params=(gh_id, ))
 
     # 从数据库获取粉丝 && token
     async def get_cookie_token_from_database(self, gh_id):
@@ -76,7 +77,8 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
     # 获取账号列表
     async def get_account_list_from_database(self):
         query = """
-            SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, crawl_history_status
+            SELECT gh_id, account_name, app_id, app_secret, cursor_openid, cursor_timestamp, 
+            crawl_history_status, binding_status
             FROM gzh_account_info WHERE status = %s; 
         """
         return await self.pool.async_fetch(query=query, params=(self.AVAILABLE_STATUS,))
@@ -208,12 +210,15 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
             query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
         )
 
-    async def set_access_token_for_each_account(self, gh_id, access_token):
+    async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
         query = """
-            UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s WHERE gh_id = %s;
+            UPDATE gzh_cookie_info 
+            SET access_token = %s, access_token_status = %s, expire_timestamp = %s 
+            WHERE gh_id = %s;
         """
         return await self.pool.async_save(
-            query=query, params=(access_token, self.AVAILABLE_STATUS, gh_id)
+            query=query,
+            params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id)
         )
 
     async def get_max_cursor_id(self, gh_id):
@@ -395,24 +400,37 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
         access_token_info = await self.get_access_token_from_database(
             account_info["gh_id"]
         )
+
         if not access_token_info:
-            print(f"{account_info['account_name']}: access_token is not available")
-            response = await get_access_token(
-                account_info["app_id"], account_info["app_secret"]
-            )
-            access_token = response.get("access_token")
+            return
+
+        # 更新 token
+        async def update_token(_new_token_info):
+            _access_token = _new_token_info["access_token"]
+            _expires_in = _new_token_info["expires_in"]
+
             await self.set_access_token_for_each_account(
-                account_info["gh_id"], access_token
+                gh_id=account_info["gh_id"],
+                access_token=_access_token,
+                expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
             )
-            return
+            print(f"{account_info['account_name']} access_token updated to database")
+
+        expire_timestamp = access_token_info[0]["expire_timestamp"] or 0
+        if int(time.time()) >= expire_timestamp:
+            new_token_info = await get_access_token(
+                account_info["app_id"], account_info["app_secret"]
+            )
+            access_token = new_token_info["access_token"]
+            await update_token(_new_token_info=new_token_info)
+        else:
+            access_token = access_token_info[0]["access_token"]
 
-        access_token = access_token_info[0]["access_token"]
         union_info = await get_union_id_batch(
             access_token=access_token, user_list=user_list
         )
         if union_info.get("errcode"):
             await self.set_access_token_as_invalid(gh_id=account_info["gh_id"])
-
             return
 
         # 将查询到的 union_id存储到数据库中
@@ -420,6 +438,8 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
         if not user_info_list:
             return
 
+        print(json.dumps(user_info_list, ensure_ascii=False, indent=4))
+
         semaphore = asyncio.Semaphore(10)
         tasks = [
             self.save_single_union_user(account_info["gh_id"], user_info, semaphore)
@@ -444,8 +464,9 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
                 )
 
             case "get_union_ids":
+                binding_accounts = [i for i in account_list if i['binding_status'] == self.AVAILABLE_STATUS]
                 return await run_tasks_with_asyncio_task_group(
-                    task_list=account_list,
+                    task_list=binding_accounts,
                     handler=self.get_union_ids_for_each_account,
                     max_concurrency=self.MAX_CONCURRENCY,
                     fail_fast=False,

+ 2 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -1,3 +1,4 @@
+from .article_detail_stat import ArticleDetailStat
 from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
@@ -15,4 +16,5 @@ __all__ = [
     "RecycleMiniProgramDetailTask",
     "RecycleOutsideAccountArticlesTask",
     "UpdateOutsideRootSourceIdAndUpdateTimeTask",
+    "ArticleDetailStat",
 ]

+ 308 - 0
applications/tasks/data_recycle_tasks/article_detail_stat.py

@@ -0,0 +1,308 @@
+import json
+import time
+from datetime import datetime, timedelta
+
+from applications.api import feishu_robot
+
+from applications.crawler.wechat import get_gzh_stat_daily
+from applications.crawler.wechat import get_access_token
+
+from applications.utils import run_tasks_with_asyncio_task_group
+
+
+class ArticleDetailStatConst:
+    # Task Status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+
+    # Account Status
+    ACCOUNT_VALID_STATUS = 1
+    ACCOUNT_INVALID_STATUS = 0
+
+    # Cookie Status
+    COOKIE_VALID_STATUS = 1
+    COOKIE_INVALID_STATUS = 0
+
+    # Gap Time
+    GAP_DURATION = 300
+
+    # 小数点保留位数
+    DECIMAL_PRECISION = 7
+
+
+class ArticleDetailStatMapper(ArticleDetailStatConst):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    # 获取账号信息
+    async def fetch_monitor_accounts(self):
+        query = """
+            SELECT gh_id, account_name, app_id, app_secret 
+            FROM gzh_account_info WHERE status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.ACCOUNT_VALID_STATUS,)
+        )
+
+    # 更新 access_token
+    async def set_access_token_for_each_account(
+        self, gh_id, access_token, expire_timestamp
+    ):
+        query = """
+            UPDATE gzh_cookie_info 
+            SET access_token = %s, access_token_status = %s, expire_timestamp = %s
+            WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id),
+        )
+
+    # 从数据库获取 access_token
+    async def get_access_token_from_database(self, gh_id):
+        query = """
+            SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(gh_id,))
+
+    # 从official_articles_v2 获取文章详情
+    async def fetch_article_detail(self, gh_id, msg_id):
+        query = """
+            SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5
+            FROM official_articles_v2
+            WHERE ghId = %s AND appMsgId = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id)
+        )
+
+    # 存储文章的各个表现
+    async def save_article_performance(self, params: tuple):
+        query = """
+            INSERT INTO article_daily_performance 
+            (
+                account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date,
+                read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate, 
+                read_avg_activetime, zaikan_user, like_user, comment_count, collection_user
+            ) VALUES (
+                %s, %s, %s, %s, %s, %s, %s, %s, %s,
+                %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
+            )
+            ON DUPLICATE KEY UPDATE
+                read_user = VALUES(read_user),
+                share_user = VALUES(share_user),
+                read_subscribe_user = VALUES(read_subscribe_user),
+                read_delivery_rate = VALUES(read_delivery_rate),
+                read_finish_rate = VALUES(read_finish_rate),
+                read_avg_activetime = VALUES(read_avg_activetime),
+                zaikan_user = VALUES(zaikan_user),
+                like_user = VALUES(like_user),
+                comment_count = VALUES(comment_count),
+                collection_user = VALUES(collection_user),
+                updated_at = CURRENT_TIMESTAMP;
+        """
+        return await self.pool.async_save(query=query, params=params)
+
+    # 存储文章的阅读分布信息
+    async def save_article_read_distribution(self, params: tuple):
+        query = """
+            INSERT INTO article_read_distribution 
+            (
+                wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate,
+                src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user,
+                src_search_user, src_other_user, src_total_user
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE
+                jump_pos_1_rate = VALUES(jump_pos_1_rate),
+                jump_pos_2_rate = VALUES(jump_pos_2_rate),
+                jump_pos_3_rate = VALUES(jump_pos_3_rate),
+                jump_pos_4_rate = VALUES(jump_pos_4_rate),
+                jump_pos_5_rate = VALUES(jump_pos_5_rate),
+                src_mp_message_user = VALUES(src_mp_message_user),
+                src_mp_home_user = VALUES(src_mp_home_user),
+                src_chat_user = VALUES(src_chat_user),
+                src_moments_user = VALUES(src_moments_user),
+                src_recommend_user = VALUES(src_recommend_user),
+                src_search_user = VALUES(src_search_user),
+                src_other_user = VALUES(src_other_user),
+                src_total_user = VALUES(src_total_user),
+                updated_at = CURRENT_TIMESTAMP;
+        """
+        return await self.pool.async_save(query=query, params=params)
+
+
+class ArticleDetailStat(ArticleDetailStatMapper):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
+    # 存储账号信息
+    async def save_account_details(self, account, fetch_response):
+        article_list = fetch_response.get("list")
+        if not article_list:
+            return 0
+        is_delay = fetch_response["is_delay"]
+        if is_delay:
+            print("数据延迟,重新处理")
+            return 0
+
+        async def _process_article_detail(_article_list):
+            _msg_id_set = set()
+            for _article in _article_list:
+                _msg_id = _article["msgid"].split("_")[0]
+                _msg_id_set.add(_msg_id)
+
+            _article_mapper = {}
+            for _msg_id in _msg_id_set:
+                _publish_articles = await self.fetch_article_detail(
+                    gh_id=account["gh_id"], msg_id=_msg_id
+                )
+                for _publish_article in _publish_articles:
+                    _key = f"{_msg_id}_{_publish_article['ItemIndex']}"
+                    _value = {
+                        "wx_sn": _publish_article["wx_sn"].decode("utf-8"),
+                        "title": _publish_article["title"],
+                        "content_url": _publish_article["ContentUrl"],
+                    }
+                    _article_mapper[_key] = _value
+
+            return _article_mapper
+
+        account_published_articles = await _process_article_detail(article_list)
+
+        async def _update_single_article(_article):
+            # 文章基本信息
+            app_msg_id = _article["msgid"]
+            msg_id = app_msg_id.split("_")[0]
+            position = app_msg_id.split("_")[1]
+            wx_sn = account_published_articles.get(app_msg_id)["wx_sn"]
+
+            # 文章表现
+            article_performance = _article["detail_list"][0]
+            await self.save_article_performance(
+                params=(
+                    account["account_name"],
+                    account["gh_id"],
+                    wx_sn,
+                    app_msg_id,
+                    msg_id,
+                    position,
+                    account_published_articles.get(app_msg_id)["title"],
+                    account_published_articles.get(app_msg_id)["content_url"],
+                    article_performance["stat_date"],
+                    article_performance["read_user"],
+                    article_performance["share_user"],
+                    article_performance["read_subscribe_user"],
+                    round(
+                        article_performance["read_delivery_rate"],
+                        self.DECIMAL_PRECISION,
+                    ),
+                    round(
+                        article_performance["read_finish_rate"], self.DECIMAL_PRECISION
+                    ),
+                    round(
+                        article_performance["read_avg_activetime"],
+                        self.DECIMAL_PRECISION,
+                    ),
+                    article_performance["zaikan_user"],
+                    article_performance["like_user"],
+                    article_performance["comment_count"],
+                    article_performance["collection_user"],
+                )
+            )
+
+            # 表现分布信息
+            jump_rate_map = {
+                i["position"]: round(i["rate"], self.DECIMAL_PRECISION)
+                for i in article_performance["read_jump_position"]
+            }
+            source_map = {
+                i["scene_desc"]: i["user_count"]
+                for i in article_performance["read_user_source"]
+            }
+            await self.save_article_read_distribution(
+                params=(
+                    wx_sn,
+                    jump_rate_map[1],
+                    jump_rate_map[2],
+                    jump_rate_map[3],
+                    jump_rate_map[4],
+                    jump_rate_map[5],
+                    source_map["公众号消息"],
+                    source_map["公众号主页"],
+                    source_map["聊天会话"],
+                    source_map["朋友圈"],
+                    source_map["推荐"],
+                    source_map["搜一搜"],
+                    source_map["其他"],
+                    source_map["全部"],
+                )
+            )
+
+        return await run_tasks_with_asyncio_task_group(
+            task_list=article_list,
+            handler=_update_single_article,
+            description="批量更新文章阅读信息",
+            unit="article",
+        )
+
+    # 处理单个账号
+    async def process_single_account(self, account: dict):
+        gh_id = account["gh_id"]
+        token_info = await self.get_access_token_from_database(gh_id)
+        if not token_info:
+            return
+
+        # 更新 token
+        async def update_token(_new_token_info):
+            _access_token = _new_token_info["access_token"]
+            _expires_in = _new_token_info["expires_in"]
+
+            await self.set_access_token_for_each_account(
+                gh_id=account["gh_id"],
+                access_token=_access_token,
+                expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
+            )
+            print(f"{account['account_name']} access_token updated to database")
+
+        expire_timestamp = token_info[0]["expire_timestamp"] or 0
+        if int(time.time()) >= expire_timestamp:
+            print(f"{account['account_name']} access_token expired")
+            new_token_info = await get_access_token(
+                account["app_id"], account["app_secret"]
+            )
+            access_token = new_token_info["access_token"]
+            await update_token(_new_token_info=new_token_info)
+
+        else:
+            access_token = token_info[0]["access_token"]
+
+        # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d")
+        dt_list = [
+            (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
+            for i in range(1, 31)
+        ]
+        for dt in dt_list:
+            print(f"{account['account_name']} crawl {dt} read_data")
+            fetch_response = await get_gzh_stat_daily(
+                access_token=access_token, date_string=dt
+            )
+
+            # 请求失败的情况
+            if fetch_response.get("errcode") == 40001:
+                fetch_new_token_info = await get_access_token(
+                    account["app_id"], account["app_secret"]
+                )
+                await update_token(_new_token_info=fetch_new_token_info)
+                break
+
+            # 处理并且落表
+            await self.save_account_details(account, fetch_response)
+
+    # 入口函数
+    async def deal(self):
+        accounts = await self.fetch_monitor_accounts()
+        for account in accounts:
+            await self.process_single_account(account)

+ 1 - 0
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -47,6 +47,7 @@ class Const:
         "gh_72bace6b3059",
         "gh_dd4c857bbb36",
         "gh_ff487cb5dab3",
+        "gh_ac43eb24376d"
     ]
 
     # NOT USED SERVER ACCOUNT