Przeglądaj źródła

新增文章详情抓取

luojunhui 1 miesiąc temu
rodzic
commit
e78d01883e

+ 7 - 2
applications/crawler/wechat/gzh_fans.py

@@ -47,9 +47,14 @@ async def get_gzh_fans(token, cookie, cursor_id, cursor_timestamp):
 
 # 获取 access_token
 async def get_access_token(app_id, app_secret):
-    url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={app_id}&secret={app_secret}"
+    url = f"https://api.weixin.qq.com/cgi-bin/stable_token"
+    data = {
+        "grant_type": "client_credential",
+        "appid": app_id,
+        "secret": app_secret
+    }
     async with AsyncHttpClient(timeout=100) as http_client:
-        response = await http_client.get(url)
+        response = await http_client.post(url, json=data)
 
     return response
 

+ 237 - 12
applications/tasks/data_recycle_tasks/article_detail_stat.py

@@ -1,12 +1,16 @@
 import json
 import time
+from datetime import datetime, timedelta
+
+from applications.api import feishu_robot
 
 from applications.crawler.wechat import get_gzh_stat_daily
 from applications.crawler.wechat import get_access_token
 
+from applications.utils import run_tasks_with_asyncio_task_group
 
-class ArticleDetailStatConst:
 
+class ArticleDetailStatConst:
     # Task Status
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
@@ -21,9 +25,14 @@ class ArticleDetailStatConst:
     COOKIE_VALID_STATUS = 1
     COOKIE_INVALID_STATUS = 0
 
+    # Gap Time
+    GAP_DURATION = 300
+
+    # 小数点保留位数
+    DECIMAL_PRECISION = 7
 
-class ArticleDetailStatMapper(ArticleDetailStatConst):
 
+class ArticleDetailStatMapper(ArticleDetailStatConst):
     def __init__(self, pool, log_client):
         self.pool = pool
         self.log_client = log_client
@@ -34,17 +43,22 @@ class ArticleDetailStatMapper(ArticleDetailStatConst):
             SELECT gh_id, account_name, app_id, app_secret 
             FROM gzh_account_info WHERE status = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(self.ACCOUNT_VALID_STATUS, ))
+        return await self.pool.async_fetch(
+            query=query, params=(self.ACCOUNT_VALID_STATUS,)
+        )
 
     # 更新 access_token
-    async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
+    async def set_access_token_for_each_account(
+        self, gh_id, access_token, expire_timestamp
+    ):
         query = """
             UPDATE gzh_cookie_info 
             SET access_token = %s, access_token_status = %s, expire_timestamp = %s
             WHERE gh_id = %s;
         """
         return await self.pool.async_save(
-            query=query, params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id)
+            query=query,
+            params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id),
         )
 
     # 从数据库获取 access_token
@@ -52,13 +66,188 @@ class ArticleDetailStatMapper(ArticleDetailStatConst):
         query = """
             SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(gh_id, ))
+        return await self.pool.async_fetch(query=query, params=(gh_id,))
+
+    # 从official_articles_v2 获取文章详情
+    async def fetch_article_detail(self, gh_id, msg_id):
+        query = """
+            SELECT wx_sn, title, ContentUrl, ItemIndex, title_md5
+            FROM official_articles_v2
+            WHERE ghId = %s AND appMsgId = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(gh_id, msg_id)
+        )
+
+    # 存储文章的各个表现
+    async def save_article_performance(self, params: tuple):
+        query = """
+            INSERT INTO article_daily_performance 
+            (
+                account_name, gh_id, wx_sn, app_msg_id, msg_id, position, title, content_url, stat_date,
+                read_user, share_user, read_subscribe_user, read_delivery_rate, read_finish_rate, 
+                read_avg_activetime, zaikan_user, like_user, comment_count, collection_user
+            ) VALUES (
+                %s, %s, %s, %s, %s, %s, %s, %s, %s,
+                %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
+            )
+            ON DUPLICATE KEY UPDATE
+                read_user = VALUES(read_user),
+                share_user = VALUES(share_user),
+                read_subscribe_user = VALUES(read_subscribe_user),
+                read_delivery_rate = VALUES(read_delivery_rate),
+                read_finish_rate = VALUES(read_finish_rate),
+                read_avg_activetime = VALUES(read_avg_activetime),
+                zaikan_user = VALUES(zaikan_user),
+                like_user = VALUES(like_user),
+                comment_count = VALUES(comment_count),
+                collection_user = VALUES(collection_user),
+                updated_at = CURRENT_TIMESTAMP;
+        """
+        return await self.pool.async_save(query=query, params=params)
+
+    # 存储文章的阅读分布信息
+    async def save_article_read_distribution(self, params: tuple):
+        query = """
+            INSERT INTO article_read_distribution 
+            (
+                wx_sn, jump_pos_1_rate, jump_pos_2_rate, jump_pos_3_rate, jump_pos_4_rate, jump_pos_5_rate,
+                src_mp_message_user, src_mp_home_user, src_chat_user, src_moments_user, src_recommend_user,
+                src_search_user, src_other_user, src_total_user
+            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE
+                jump_pos_1_rate = VALUES(jump_pos_1_rate),
+                jump_pos_2_rate = VALUES(jump_pos_2_rate),
+                jump_pos_3_rate = VALUES(jump_pos_3_rate),
+                jump_pos_4_rate = VALUES(jump_pos_4_rate),
+                jump_pos_5_rate = VALUES(jump_pos_5_rate),
+                src_mp_message_user = VALUES(src_mp_message_user),
+                src_mp_home_user = VALUES(src_mp_home_user),
+                src_chat_user = VALUES(src_chat_user),
+                src_moments_user = VALUES(src_moments_user),
+                src_recommend_user = VALUES(src_recommend_user),
+                src_search_user = VALUES(src_search_user),
+                src_other_user = VALUES(src_other_user),
+                src_total_user = VALUES(src_total_user),
+                updated_at = CURRENT_TIMESTAMP;
+        """
+        return await self.pool.async_save(query=query, params=params)
 
 
 class ArticleDetailStat(ArticleDetailStatMapper):
     def __init__(self, pool, log_client):
         super().__init__(pool, log_client)
 
+    # 存储账号信息
+    async def save_account_details(self, account, fetch_response):
+        article_list = fetch_response.get("list")
+        if not article_list:
+            return 0
+        is_delay = fetch_response["is_delay"]
+        if is_delay:
+            print("数据延迟,重新处理")
+            return 0
+
+        async def _process_article_detail(_article_list):
+            _msg_id_set = set()
+            for _article in _article_list:
+                _msg_id = _article["msgid"].split("_")[0]
+                _msg_id_set.add(_msg_id)
+
+            _article_mapper = {}
+            for _msg_id in _msg_id_set:
+                _publish_articles = await self.fetch_article_detail(
+                    gh_id=account["gh_id"], msg_id=_msg_id
+                )
+                for _publish_article in _publish_articles:
+                    _key = f"{_msg_id}_{_publish_article['ItemIndex']}"
+                    _value = {
+                        "wx_sn": _publish_article["wx_sn"].decode("utf-8"),
+                        "title": _publish_article["title"],
+                        "content_url": _publish_article["ContentUrl"],
+                    }
+                    _article_mapper[_key] = _value
+
+            return _article_mapper
+
+        account_published_articles = await _process_article_detail(article_list)
+
+        async def _update_single_article(_article):
+            # 文章基本信息
+            app_msg_id = _article["msgid"]
+            msg_id = app_msg_id.split("_")[0]
+            position = app_msg_id.split("_")[1]
+            wx_sn = account_published_articles.get(app_msg_id)["wx_sn"]
+
+            # 文章表现
+            article_performance = _article["detail_list"][0]
+            await self.save_article_performance(
+                params=(
+                    account["account_name"],
+                    account["gh_id"],
+                    wx_sn,
+                    app_msg_id,
+                    msg_id,
+                    position,
+                    account_published_articles.get(app_msg_id)["title"],
+                    account_published_articles.get(app_msg_id)["content_url"],
+                    article_performance["stat_date"],
+                    article_performance["read_user"],
+                    article_performance["share_user"],
+                    article_performance["read_subscribe_user"],
+                    round(
+                        article_performance["read_delivery_rate"],
+                        self.DECIMAL_PRECISION,
+                    ),
+                    round(
+                        article_performance["read_finish_rate"], self.DECIMAL_PRECISION
+                    ),
+                    round(
+                        article_performance["read_avg_activetime"],
+                        self.DECIMAL_PRECISION,
+                    ),
+                    article_performance["zaikan_user"],
+                    article_performance["like_user"],
+                    article_performance["comment_count"],
+                    article_performance["collection_user"],
+                )
+            )
+
+            # 表现分布信息
+            jump_rate_map = {
+                i["position"]: round(i["rate"], self.DECIMAL_PRECISION)
+                for i in article_performance["read_jump_position"]
+            }
+            source_map = {
+                i["scene_desc"]: i["user_count"]
+                for i in article_performance["read_user_source"]
+            }
+            await self.save_article_read_distribution(
+                params=(
+                    wx_sn,
+                    jump_rate_map[1],
+                    jump_rate_map[2],
+                    jump_rate_map[3],
+                    jump_rate_map[4],
+                    jump_rate_map[5],
+                    source_map["公众号消息"],
+                    source_map["公众号主页"],
+                    source_map["聊天会话"],
+                    source_map["朋友圈"],
+                    source_map["推荐"],
+                    source_map["搜一搜"],
+                    source_map["其他"],
+                    source_map["全部"],
+                )
+            )
+
+        return await run_tasks_with_asyncio_task_group(
+            task_list=article_list,
+            handler=_update_single_article,
+            description="批量更新文章阅读信息",
+            unit="article",
+        )
+
     # 处理单个账号
     async def process_single_account(self, account: dict):
         gh_id = account["gh_id"]
@@ -66,18 +255,54 @@ class ArticleDetailStat(ArticleDetailStatMapper):
         if not token_info:
             return
 
+        # 更新 token
+        async def update_token(_new_token_info):
+            _access_token = _new_token_info["access_token"]
+            _expires_in = _new_token_info["expires_in"]
+
+            await self.set_access_token_for_each_account(
+                gh_id=account["gh_id"],
+                access_token=_access_token,
+                expire_timestamp=_expires_in + int(time.time()) - self.GAP_DURATION,
+            )
+            print(f"{account['account_name']} access_token updated to database")
+
         expire_timestamp = token_info[0]["expire_timestamp"] or 0
         if int(time.time()) >= expire_timestamp:
             print(f"{account['account_name']} access_token expired")
-            new_token_info = await get_access_token(account["app_id"], account["app_secret"])
-            print(json.dumps(new_token_info, indent=4))
+            new_token_info = await get_access_token(
+                account["app_id"], account["app_secret"]
+            )
+            access_token = new_token_info["access_token"]
+            await update_token(_new_token_info=new_token_info)
+
+        else:
+            access_token = token_info[0]["access_token"]
+
+        # yesterday_string = datetime.strftime(datetime.now() - timedelta(days=5), "%Y-%m-%d")
+        dt_list = [
+            (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
+            for i in range(1, 31)
+        ]
+        for dt in dt_list:
+            print(f"{account['account_name']} crawl {dt} read_data")
+            fetch_response = await get_gzh_stat_daily(
+                access_token=access_token, date_string=dt
+            )
 
+            # 请求失败的情况
+            if fetch_response.get("errcode") == 40001:
+                fetch_new_token_info = await get_access_token(
+                    account["app_id"], account["app_secret"]
+                )
+                await update_token(_new_token_info=fetch_new_token_info)
+                break
+
+            # 处理并且落表
+            await self.save_account_details(account, fetch_response)
 
     # 入口函数
     async def deal(self):
         accounts = await self.fetch_monitor_accounts()
-        for account in accounts[:1]:
+        for account in accounts:
             await self.process_single_account(account)
-
-
-