Prechádzať zdrojové kódy

Merge branch 'feature/luojunhui-20250708-fans-stat' of luojunhui/LongArticlesJob into master

luojunhui 3 mesiacov pred
rodič
commit
80aab5b5d8

+ 41 - 21
applications/utils/common.py

@@ -4,7 +4,7 @@
 
 import hashlib
 
-from datetime import datetime, timezone
+from datetime import datetime, timezone, date, timedelta
 from requests import RequestException
 from urllib.parse import urlparse, parse_qs
 from tenacity import (
@@ -103,7 +103,7 @@ def show_desc_to_sta(show_desc):
         :param show_v:
         :return:
         """
-        foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
+        foo = show_v.replace("千", "e3").replace("万", "e4").replace("亿", "e8")
         foo = eval(foo)
         return int(foo)
 
@@ -114,31 +114,31 @@ def show_desc_to_sta(show_desc):
         :return:
         """
         this_dict = {
-            '阅读': 'show_view_count',  # 文章
-            '看过': 'show_view_count',  # 图文
-            '观看': 'show_view_count',  # 视频
-            '赞': 'show_like_count',
-            '付费': 'show_pay_count',
-            '赞赏': 'show_zs_count',
+            "阅读": "show_view_count",  # 文章
+            "看过": "show_view_count",  # 图文
+            "观看": "show_view_count",  # 视频
+            "赞": "show_like_count",
+            "付费": "show_pay_count",
+            "赞赏": "show_zs_count",
         }
         if show_k not in this_dict:
-            print(f'error from decode_show_k, show_k not found: {show_k}')
-        return this_dict.get(show_k, 'show_unknown')
+            print(f"error from decode_show_k, show_k not found: {show_k}")
+        return this_dict.get(show_k, "show_unknown")
 
-    show_desc = show_desc.replace('+', '')
+    show_desc = show_desc.replace("+", "")
     sta = {}
-    for show_kv in show_desc.split('\u2004\u2005'):
+    for show_kv in show_desc.split("\u2004\u2005"):
         if not show_kv:
             continue
-        show_k, show_v = show_kv.split('\u2006')
+        show_k, show_v = show_kv.split("\u2006")
         k = decode_show_k(show_k)
         v = decode_show_v(show_v)
         sta[k] = v
     res = {
-        'show_view_count': sta.get('show_view_count', 0),
-        'show_like_count': sta.get('show_like_count', 0),
-        'show_pay_count': sta.get('show_pay_count', 0),
-        'show_zs_count': sta.get('show_zs_count', 0),
+        "show_view_count": sta.get("show_view_count", 0),
+        "show_like_count": sta.get("show_like_count", 0),
+        "show_pay_count": sta.get("show_pay_count", 0),
+        "show_zs_count": sta.get("show_zs_count", 0),
     }
     return res
 
@@ -154,12 +154,32 @@ def generate_gzh_id(url):
     return md5_value
 
 
-
-def timestamp_to_str(timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
+def timestamp_to_str(timestamp, string_format="%Y-%m-%d %H:%M:%S") -> str:
     """
     :param string_format:
     :param timestamp:
     """
-    dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
+    dt_object = (
+        datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
+    )
     date_string = dt_object.strftime(string_format)
-    return date_string
+    return date_string
+
+
+def days_remaining_in_month():
+    # 获取当前日期
+    today = date.today()
+
+    # 获取下个月的第一天
+    if today.month == 12:
+        next_month = today.replace(year=today.year + 1, month=1, day=1)
+    else:
+        next_month = today.replace(month=today.month + 1, day=1)
+
+    # 计算本月最后一天(下个月第一天减去1天)
+    last_day_of_month = next_month - timedelta(days=1)
+
+    # 计算剩余天数
+    remaining_days = (last_day_of_month - today).days
+
+    return remaining_days

+ 6 - 2
fwh_data_manager.py

@@ -1,13 +1,17 @@
 from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
 from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
+from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishMonitor
 
 
 if __name__ == '__main__':
     # 1. 从 aigc 获取数据
     fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
     fwh_group_publish_record_manager.deal()
-    fwh_group_publish_record_manager.monitor()
 
     # 2. 保存数据到数据库
     save_fwh_data_to_database = SaveFwhDataToDatabase()
-    save_fwh_data_to_database.deal()
+    save_fwh_data_to_database.deal()
+
+    # 3. 监测报警
+    fwh_group_publish_monitor = FwhGroupPublishMonitor()
+    fwh_group_publish_monitor.deal()

+ 153 - 53
tasks/data_tasks/fwh_data_recycle.py

@@ -10,7 +10,7 @@ from pymysql.cursors import DictCursor
 
 from applications.api import FeishuBotApi
 from applications.db import DatabaseConnector
-from applications.utils import str_to_md5
+from applications.utils import str_to_md5, days_remaining_in_month
 from cold_start.crawler.wechat import get_article_detail
 from config import denet_config, long_articles_config, piaoquan_crawler_config
 
@@ -59,12 +59,12 @@ class FwhDataRecycle:
         return account_name
 
     def illegal_article_bot(
-            self,
-            account_name:str,
-            gh_id:str,
-            group_id: str,
-            illegal_msg: str,
-            publish_date: str,
+        self,
+        account_name: str,
+        gh_id: str,
+        group_id: str,
+        illegal_msg: str,
+        publish_date: str,
     ):
         self.feishu_robot.bot(
             title="服务号文章违规告警,请前往微信公众平台处理",
@@ -73,9 +73,9 @@ class FwhDataRecycle:
                 "gh_id": gh_id,
                 "group_id": group_id,
                 "illegal_msg": illegal_msg,
-                "publish_date": str(publish_date)
+                "publish_date": str(publish_date),
             },
-            env="server_account_publish_monitor"
+            env="server_account_publish_monitor",
         )
 
 
@@ -169,48 +169,6 @@ class FwhGroupPublishRecordManager(FwhDataRecycle):
                     record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
                 )
 
-    def monitor(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
-        """
-        monitor the publish record
-        """
-        now = datetime.now()
-        if now.hour < 12:
-            account_list = self.get_group_server_accounts()
-            do_not_publish_account = []
-            sql = f"""
-                select account_name as '账号名称', gh_id, count(distinct user_group_id) as '发文组数'
-                from long_articles_group_send_result
-                where publish_date = %s
-                group by account_name, gh_id; 
-            """
-            publish_records = self.long_articles_client.fetch(
-                query=sql, cursor_type=DictCursor, params=(date_string,)
-            )
-            self.feishu_robot.bot(
-                title=f"{date_string}服务号发文记录",
-                mention=False,
-                detail=publish_records,
-                env="server_account_publish_monitor",
-            )
-
-            publish_account_id_set = set([i["gh_id"] for i in publish_records])
-            for account_id in account_list:
-                if account_id not in publish_account_id_set:
-                    account_name = self.get_server_account_name(account_id)
-                    do_not_publish_account.append(
-                        {
-                            "account_name": account_name,
-                            "gh_id": account_id,
-                        }
-                    )
-
-            if do_not_publish_account:
-                self.feishu_robot.bot(
-                    title=f"{date_string}发现服务号存在未发文情况",
-                    detail=do_not_publish_account,
-                    env="server_account_publish_monitor",
-                )
-
 
 class SaveFwhDataToDatabase(FwhDataRecycle):
 
@@ -247,7 +205,9 @@ class SaveFwhDataToDatabase(FwhDataRecycle):
             from long_articles_group_send_result
             where gh_id = %s and recycle_status = %s and create_time > %s;
         """
-        earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime("%Y-%m-%d %H:%M:%S")
+        earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
+            "%Y-%m-%d %H:%M:%S"
+        )
         return self.long_articles_client.fetch(
             fetch_query,
             DictCursor,
@@ -404,4 +364,144 @@ class FwhDataExportTemp(FwhDataRecycle):
                 except Exception as e:
                     print(f"article {article['ContentUrl']} is not available, skip it")
         df = pd.DataFrame(L)
-        df.to_csv("temp2.csv", index=False)
+        df.to_csv("temp2.csv", index=False)
+
+
+class FwhGroupPublishMonitor(FwhDataRecycle):
+
+    def get_sent_fans(self, date_string: str, gh_id: str) -> int:
+        """
+        get the number of fans sent on the specified date
+        """
+        fetch_query = f"""
+            select push_id, avg(sent_count) as 'total_sent_fans'
+            from long_articles_group_send_result
+            where publish_date = %s and gh_id = %s and status = %s
+            group by push_id;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            fetch_query,
+            cursor_type=DictCursor,
+            params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
+        )
+        fans_list = [i["total_sent_fans"] for i in fetch_response]
+        return sum(fans_list) if fans_list else 0
+
+    def get_remain_fans(self, gh_id: str):
+        """
+        get the number of fans remain
+        """
+        fetch_query = f"""
+            select count(1) as 'remain_fans'
+            from article_user_group 
+            where gzh_id = %s and is_delete = %s and remaining_count > %s;
+        """
+        fetch_response = self.piaoquan_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(gh_id, 0, 0)
+        )
+        response = fetch_response[0]["remain_fans"]
+        return response if response else 0
+
+    def get_remain_publish_times(self, gh_id: str):
+        """
+        获取剩余可发布次数
+        """
+        fetch_query = f"""
+            select sum(remaining_count) as 'remain_publish_times'
+            from article_user_group
+            where gzh_id = %s and is_delete = %s;
+        """
+        fetch_response = self.piaoquan_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(gh_id, 0)
+        )
+        response = fetch_response[0]["remain_publish_times"]
+        return response if response else 0
+
+    def get_remain_fans_and_publish_times(self, gh_id: str, date_string: str):
+        """
+        获取发布前,该账号剩余的发布次数和粉丝数
+        """
+        fetch_query = f"""
+            select fans_before_publish, publish_times_before_publish
+            from fwh_daily_publish_detail
+            where gh_id = %s and publish_date = %s;
+        """
+        fetch_response = self.piaoquan_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(gh_id, date_string)
+        )
+        return fetch_response[0] if fetch_response else None
+
+    def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
+        """
+        monitor the publish record
+        """
+        now = datetime.now()
+        if now.hour > 12:
+            return
+
+        gh_id_list = self.get_group_server_accounts()
+
+        # get rest publish days
+        remain_days = days_remaining_in_month()
+
+        # get table columns
+        columns = [
+            self.feishu_robot.create_feishu_columns_sheet(
+                sheet_type="plain_text",
+                sheet_name="account_name",
+                display_name="公众号名称",
+            ),
+            self.feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
+            ),
+            self.feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number",
+                sheet_name="rest_publish_times",
+                display_name="发文前剩余发文次数",
+            ),
+            self.feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number",
+                sheet_name="rest_publish_fans",
+                display_name="发文前剩余发文粉丝数",
+            ),
+            self.feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number",
+                sheet_name="remain_days",
+                display_name="本月剩余天数",
+            ),
+        ]
+
+        monitor_table = []
+
+        for gh_id in gh_id_list:
+            account_name = self.get_server_account_name(gh_id)
+            # 获取今日发布粉丝数量
+            sent_fans = self.get_sent_fans(date_string, gh_id)
+            # # 获取剩余可发文人数
+            # remain_fans = self.get_remain_fans(gh_id)
+            #
+            # # 获取剩余可发文次数
+            # remain_publish_times = self.get_remain_publish_times(gh_id)
+
+            remain_fans, remain_publish_times = self.get_remain_fans_and_publish_times(
+                gh_id, date_string
+            )
+            temp = {
+                "account_name": account_name,
+                "rest_publish_times": int(remain_publish_times + sent_fans),
+                "rest_publish_fans": int(remain_fans),
+                "remain_days": int(remain_days),
+                "sent_fans": int(sent_fans),
+            }
+            monitor_table.append(temp)
+
+        # print(monitor_table)
+        # print(json.dumps(monitor_table, ensure_ascii=False, indent=4))
+        # feishu bot
+        self.feishu_robot.bot(
+            title=f"{date_string}服务号发文详情",
+            detail={"columns": columns, "rows": monitor_table},
+            table=True,
+            mention=False,
+            env="server_account_publish_monitor",
+        )