Browse Source

计算阅读率表定时任务上线

luojunhui 1 năm trước cách đây
mục cha
commit
8a65573625
2 tập tin đã thay đổi với 120 bổ sung46 xóa
  1. 1 0
      applications/longArticlesMysql.py
  2. 119 46
      cal_account_read_rate_avg_daily.py

+ 1 - 0
applications/longArticlesMysql.py

@@ -53,4 +53,5 @@ class longArticlesMySQL(object):
         except Exception as e:
             print("Insert Many Defeat--{}".format(e))
             cls.connection.rollback()
+            raise e
 

+ 119 - 46
not_used_tasks/cal_account_read_rate_avg_daily.py → cal_account_read_rate_avg_daily.py

@@ -2,11 +2,13 @@
 @author: luojunhui
 cal each account && position reading rate
 """
+import json
 from tqdm import tqdm
-from datetime import datetime, timezone, timedelta
 from pandas import DataFrame
+from argparse import ArgumentParser
+from datetime import datetime, timezone, timedelta
 
-from applications import DeNetMysql, PQMySQL, longArticlesMySQL
+from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot
 
 STATISTICS_PERIOD = 31 * 24 * 60 * 60
 
@@ -24,6 +26,7 @@ def filter_outlier_data(group, key='show_view_count'):
     filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
     # 过滤均值倍数大于5的数据
     new_mean = filtered_group[key].mean()
+    # print("阅读均值", new_mean)
     filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
     return filtered_group
 
@@ -46,7 +49,7 @@ def str_to_timestamp(date_string) -> int:
 
     # 使用timestamp()方法将datetime对象转换为时间戳
     timestamp = date_obj.timestamp()
-    return timestamp
+    return int(timestamp)
 
 
 def get_account_fans_by_dt(db_client) -> dict:
@@ -183,7 +186,12 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple:
         & (df["updateTime"] <= max_time)
         & (df['ItemIndex'] == index)
         ]
+    # print("位置", index)
     finalDF = filter_outlier_data(filterDataFrame)
+    finalDF = finalDF.sort_values(by=['updateTime'], ascending=False)
+    # if index == 1:
+    #     for i in finalDF.values.tolist():
+    #         print(datetime.fromtimestamp(i[2]).strftime('%Y-%m-%d'), i)
     return (
         finalDF['readRate'].mean(),
         finalDF['updateTime'].max(),
@@ -192,6 +200,41 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple:
     )
 
 
+def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
+    """
+    检验某个具体账号的具体文章的阅读率均值和前段日子的比较
+    :param avg_rate: 当天计算出的阅读率均值
+    :param db_client: 数据库连接
+    :param gh_id: 账号 id
+    :param index: 账号  index
+    :param dt:
+    :return:
+    """
+    dt = int(dt.replace("-", ""))
+    select_sql = f"""
+        SELECT account_name, read_rate_avg
+        FROM long_articles_read_rate
+        WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
+        ORDER BY dt_version DESC limit 1;
+    """
+    result = db_client.select(select_sql)
+    if result:
+        account_name = result[0][0]
+        previous_read_rate_avg = result[0][1]
+        relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
+        if -0.05 <= relative_value <= 0.05:
+            return {}
+        else:
+            response = {
+                "账号名称": account_name,
+                "位置": index,
+                "当天阅读率均值": avg_rate,
+                "前一天阅读率均值": previous_read_rate_avg,
+                "相对变化率": relative_value
+            }
+            return response
+
+
 def update_single_day(dt, account_list, article_df, lam):
     """
     更新单天数据
@@ -202,37 +245,56 @@ def update_single_day(dt, account_list, article_df, lam):
     :return:
     """
     index_list = [1, 2, 3, 4, 5, 6, 7, 8]
+    error_list = []
     for account in tqdm(account_list):
         for index in index_list:
             avg_rate, max_time, min_time, a_count = cal_avg_account_read_rate(article_df, account['gh_id'], index, dt)
-            # print(account['account_name'], "\t", index, "\t", avg_rate, "\t", max_time, "\t", min_time, "\t", a_count,
-            #       "\t", account['gh_id'])
-            try:
-                if avg_rate == 0:
-                    continue
-                insert_sql = f"""
-                    INSERT INTO long_articles_read_rate_dev
-                    (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
-                    values
-                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                """
-                lam.update(
-                    sql=insert_sql,
-                    params=(
-                        account['account_name'],
-                        account['gh_id'],
-                        index,
-                        avg_rate,
-                        "从 {} 开始往前计算 31  天".format(dt),
-                        a_count,
-                        timestamp_to_str(min_time),
-                        timestamp_to_str(max_time),
-                        dt.replace("-", ""),
-                        0
+            if a_count > 0:
+                if index in {1, 2}:
+                    error_obj = check_each_position(
+                        db_client=lam,
+                        gh_id=account['gh_id'],
+                        index=index,
+                        dt=dt,
+                        avg_rate=avg_rate
+                    )
+                    if error_obj:
+                        error_list.append(error_obj)
+                # continue
+                try:
+                    if avg_rate == 0:
+                        continue
+                    insert_sql = f"""
+                        INSERT INTO long_articles_read_rate
+                        (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
+                        values
+                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                    """
+                    lam.update(
+                        sql=insert_sql,
+                        params=(
+                            account['account_name'],
+                            account['gh_id'],
+                            index,
+                            avg_rate,
+                            "从 {} 开始往前计算 31  天".format(dt),
+                            a_count,
+                            timestamp_to_str(min_time),
+                            timestamp_to_str(max_time),
+                            dt.replace("-", ""),
+                            0
+                        )
                     )
-                )
-            except Exception as e:
-                print(e)
+                except Exception as e:
+                    print(e)
+    if error_list:
+        bot(
+            title="更新阅读率均值,头次出现异常值通知",
+            detail={
+                "时间": dt,
+                "异常列表": error_list
+            }
+        )
 
 
 def main() -> None:
@@ -240,26 +302,37 @@ def main() -> None:
     main function
     :return:
     """
+    parser = ArgumentParser()
+    parser.add_argument("--run-date",
+                        help="Run only once for date in format of %Y-%m-%d. \
+                                    If no specified, run as daily jobs.")
+    args = parser.parse_args()
+    if args.run_date:
+        dt = args.run_date
+    else:
+        dt = datetime.today().strftime('%Y-%m-%d')
+
     lam = longArticlesMySQL()
     de = DeNetMysql()
-    # dt = '2024-10-22'
     account_list = get_publishing_accounts(db_client=de)
-    # 获取这些账号所有的文章
     df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
-    start_dt = start_date = datetime(2024, 8, 1)
-    end_date = datetime(2024, 10, 22)
-    # 计算日期差
-    delta = end_date - start_date
-    # 生成日期字符串列表
-    date_strings = []
-    for i in range(delta.days + 1):
-        date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d'))
-
-    # 打印结果
-    date_str = '2024-09-11'
-    date_strings = [date_str,]
-    for date_str in tqdm(date_strings):
-        update_single_day(date_str, account_list, df, lam)
+
+    update_single_day(dt, account_list, df, lam)
+
+    # start_dt = start_date = datetime(2024, 8, 1)
+    # end_date = datetime(2024, 10, 22)
+    # # 计算日期差
+    # delta = end_date - start_date
+    # # 生成日期字符串列表
+    # date_strings = []
+    # for i in range(delta.days + 1):
+    #     date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d'))
+    #
+    # # 打印结果
+    # date_str = '2024-09-11'
+    # date_strings = [date_str,]
+    # for date_str in tqdm(date_strings):
+    #     update_single_day(date_str, account_list, df, lam)
 
 
 if __name__ == '__main__':