瀏覽代碼

Merge branch '2024-11-22-luojunhui-account-read-avg-improve' of luojunhui/LongArticlesJob into master

luojunhui 11 月之前
父節點
當前提交
0aa616f994
共有 5 個文件被更改,包括 137 次插入230 次删除
  1. 37 0
      applications/const.py
  2. 35 124
      applications/functions.py
  3. 60 94
      cal_account_read_rate_avg_daily.py
  4. 1 1
      coldStartTasks/crawler/weixinCategoryCrawler.py
  5. 4 11
      updateAccountV3.py

+ 37 - 0
applications/const.py

@@ -36,3 +36,40 @@ class updatePublishedMsgTaskConst:
     SUBSCRIBE_TYPE_SET = {0, 1}
     # 服务号
     SERVICE_TYPE = 2
+
+
+class updateAccountReadRateTaskConst:
+    """
+    更新账号阅读率常量配置
+    """
+    # 阅读率统计周期(秒)
+    STATISTICS_PERIOD = 31 * 24 * 60 * 60
+    # 一天的秒数
+    ONE_DAY_IN_SECONDS = 60 * 60 * 24
+    # 相对变化率阈值
+    RELATIVE_VALUE_THRESHOLD = 0.1
+    # 发文类型
+    UNLIMITED_PUBLISH_TYPE = 10002
+    BULK_PUBLISH_TYPE = 9
+    # 文章位置
+    ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
+
+
+class updateAccountReadAvgTaskConst:
+    """
+    更新账号阅读均值常量配置
+    """
+    # 投流账号
+    TOULIU_ACCOUNTS = {
+        'gh_93e00e187787',
+        'gh_ac43e43b253b',
+        'gh_68e7fdc09fe4',
+        'gh_77f36c109fb1',
+        'gh_b181786a6c8c',
+        'gh_1ee2e1b39ccf',
+        'gh_d3f039c9db2b'
+    }
+
+    # 发文模式
+    ARTICLES_DAILY = 1
+    TOULIU = 2

+ 35 - 124
applications/functions.py

@@ -2,10 +2,9 @@
 @author: luojunhui
 """
 import threading
-from datetime import datetime, timezone
 import hashlib
-import requests
-import pymysql
+
+from datetime import datetime, timezone
 
 
 class Functions(object):
@@ -13,117 +12,6 @@ class Functions(object):
     functions class
     """
 
-    @classmethod
-    def getTitleScore(cls, title_list, account_name):
-        """
-        标题打分
-        :param title_list:
-        :param account_name:
-        :return:
-        """
-        url = "http://192.168.100.31:6060/score_list"
-        body = {
-            "account_nickname_list": [account_name],
-            "text_list": title_list,
-            "max_time": None,
-            "min_time": None,
-            "interest_type": "avg",
-            "sim_type": "mean",
-            "rate": 0.1
-        }
-        response = requests.post(url=url, headers={}, json=body).json()
-        return response
-
-    @classmethod
-    def getTitleAccountScore(cls, title, account_list):
-        """
-        标题打分
-        :param title:
-        :param account_list:
-        :return:
-        """
-        url = "http://192.168.100.31:6060/score_list"
-        body = {
-            "account_nickname_list": account_list,
-            "text_list": [title],
-            "max_time": None,
-            "min_time": None,
-            "interest_type": "avg",
-            "sim_type": "mean",
-            "rate": 0.1
-        }
-        response = requests.post(url=url, headers={}, json=body).json()
-        L = []
-        for account in account_list:
-            account_score = response[account]['score_list'][0]
-            L.append([account, account_score])
-        return L
-
-    @classmethod
-    def matchLinkByIdTuple(cls, channel_id_tuple):
-        """
-        Use channelContentId to match articleUrl
-        :param channel_id_tuple:
-        :return:
-        """
-        connection = pymysql.connect(
-            host='rm-bp12k5fuh5zyx31d28o.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='wx2023_ad',
-            password='wx2023_adP@assword1234',
-            db='adplatform',
-            charset='utf8mb4'
-        )
-        sql = f"""select id, account_id, link, item_index, title from changwen_article where id in {channel_id_tuple};"""
-        cursor = connection.cursor()
-        cursor.execute(sql)
-        article_link = cursor.fetchall()
-        L = {}
-        for line in article_link:
-            key = line[0]
-            value = {
-                "gh_key": "{}_{}".format(line[1], line[3]),
-                "url": line[2],
-                "title": line[4]
-            }
-            L[key] = value
-        return L
-
-    @classmethod
-    def TitleSimilarity(cls, title_list, target_title):
-        """
-        计算标题相似度
-        :return:
-        """
-
-        def title_sim_v2(title_a, title_b, thredhold=0.8):
-            """
-            :param title_a:
-            :param title_b:
-            :param thredhold:
-            :return:
-            """
-            if len(title_a) < 1 or len(title_b) < 1:
-                return False
-            set_a = set(title_a)
-            set_b = set(title_b)
-            set_cross = set_a & set_b
-            set_union = set_a | set_b
-            if not set_union:
-                return False
-            min_len = max(min(len(set_a), len(set_b)), 1)
-            rate = len(set_cross) / min_len
-            if rate >= thredhold:
-                return True
-            else:
-                return False
-
-        for title in title_list:
-            sim_score = title_sim_v2(target_title, title)
-            if sim_score:
-                return True
-        return False
-
     @classmethod
     def show_desc_to_sta(cls, show_desc):
         """
@@ -192,15 +80,6 @@ class Functions(object):
         md5_value = md5_hash.hexdigest()
         return md5_value
 
-    @classmethod
-    def time_stamp_to_str(cls, timestamp):
-        """
-        :param timestamp:
-        """
-        dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
-        date_string = dt_object.strftime('%Y-%m-%d %H:%M:%S')
-        return date_string
-
     @classmethod
     def job_with_thread(cls, job_func):
         """
@@ -226,4 +105,36 @@ class Functions(object):
         md5_hash.update(original_bytes)
         # 获取16进制形式的MD5哈希值
         md5_value = md5_hash.hexdigest()
-        return md5_value
+        return md5_value
+
+    @classmethod
+    def float_to_percentage(cls, value, decimals=3) -> str:
+        """
+        把小数转化为百分数
+        :param value:
+        :param decimals:
+        :return:
+        """
+        percentage_value = round(value * 100, decimals)
+        return "{}%".format(percentage_value)
+
+    @classmethod
+    def str_to_timestamp(cls, date_string, string_format='%Y-%m-%d') -> int:
+        """
+        :param string_format:
+        :param date_string:
+        :return:
+        """
+        date_obj = datetime.strptime(date_string, string_format)
+        timestamp = date_obj.timestamp()
+        return int(timestamp)
+
+    @classmethod
+    def timestamp_to_str(cls, timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
+        """
+        :param string_format:
+        :param timestamp:
+        """
+        dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
+        date_string = dt_object.strftime(string_format)
+        return date_string

+ 60 - 94
cal_account_read_rate_avg_daily.py

@@ -2,27 +2,17 @@
 @author: luojunhui
 cal each account && position reading rate
 """
-import json
 from tqdm import tqdm
 from pandas import DataFrame
 from argparse import ArgumentParser
-from datetime import datetime, timezone, timedelta
+from datetime import datetime
 
-from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot
+from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot, Functions
+from applications.const import updateAccountReadRateTaskConst
 
-STATISTICS_PERIOD = 31 * 24 * 60 * 60
-ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
-
-def float_to_percentage(value, decimals=3) -> str:
-    """
-    把小数转化为百分数
-    :param value:
-    :param decimals:
-    :return:
-    """
-    percentage_value = round(value * 100, decimals)
-    return "{}%".format(percentage_value)
+const = updateAccountReadRateTaskConst()
+functions = Functions()
+read_rate_table = "long_articles_read_rate"
 
 
 def filter_outlier_data(group, key='show_view_count'):
@@ -43,27 +33,6 @@ def filter_outlier_data(group, key='show_view_count'):
     return filtered_group
 
 
-def timestamp_to_str(timestamp) -> str:
-    """
-    :param timestamp:
-    """
-    dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
-    date_string = dt_object.strftime('%Y-%m-%d')
-    return date_string
-
-
-def str_to_timestamp(date_string) -> int:
-    """
-    :param date_string:
-    :return:
-    """
-    date_obj = datetime.strptime(date_string, '%Y-%m-%d')
-
-    # 使用timestamp()方法将datetime对象转换为时间戳
-    timestamp = date_obj.timestamp()
-    return int(timestamp)
-
-
 def get_account_fans_by_dt(db_client) -> dict:
     """
     获取每个账号发粉丝,通过日期来区分
@@ -139,20 +108,20 @@ def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
     """
     sql = f"""
             SELECT 
-                ghId, accountName, updateTime, ItemIndex, show_view_count 
+                ghId, accountName, ItemIndex, show_view_count, publish_timestamp
             FROM 
                 official_articles_v2
             WHERE 
-                ghId IN {gh_id_tuple} and Type = '9';
+                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}';
             """
     result = db_client.select(sql)
     response_list = [
         {
             "ghId": i[0],
             "accountName": i[1],
-            "updateTime": i[2],
-            "ItemIndex": i[3],
-            "show_view_count": i[4]
+            "ItemIndex": i[2],
+            "show_view_count": i[3],
+            "publish_timestamp": i[4]
         }
         for i in result
     ]
@@ -174,42 +143,41 @@ def cal_account_read_rate(gh_id_tuple) -> DataFrame:
     )
     for line in account_article_detail:
         gh_id = line['ghId']
-        dt = timestamp_to_str(line['updateTime'])
+        dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
         fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
         line['fans'] = fans
-        if fans:
+        if fans > 1000:
             line['readRate'] = line['show_view_count'] / fans if fans else 0
             response.append(line)
     return DataFrame(response,
-                     columns=['ghId', 'accountName', 'updateTime', 'ItemIndex', 'show_view_count', 'readRate'])
+                     columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
 
 
-def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple:
+def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
     """
     计算账号的阅读率均值
     :return:
     """
-    max_time = str_to_timestamp(dt)
-    min_time = max_time - STATISTICS_PERIOD
+    max_time = functions.str_to_timestamp(date_string=dt)
+    min_time = max_time - const.STATISTICS_PERIOD
 
+    # 通过
     filterDataFrame = df[
         (df["ghId"] == gh_id)
-        & (min_time <= df["updateTime"])
-        & (df["updateTime"] <= max_time)
+        & (min_time <= df["publish_timestamp"])
+        & (df["publish_timestamp"] <= max_time)
         & (df['ItemIndex'] == index)
         ]
-    # print("位置", index)
+
+    # 用二倍标准差过滤
     finalDF = filter_outlier_data(filterDataFrame)
-    # finalDF = finalDF.sort_values(by=['updateTime'], ascending=False)
-    # if index == 1:
-    #     for i in finalDF.values.tolist():
-    #         print(datetime.fromtimestamp(i[2]).strftime('%Y-%m-%d'), i)
-    return (
-        finalDF['readRate'].mean(),
-        finalDF['updateTime'].max(),
-        finalDF['updateTime'].min(),
-        len(finalDF)
-    )
+
+    return {
+        "read_rate_avg": finalDF['readRate'].mean(),
+        "max_publish_time": finalDF['publish_timestamp'].max(),
+        "min_publish_time": finalDF['publish_timestamp'].min(),
+        "records": len(finalDF)
+    }
 
 
 def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
@@ -222,11 +190,11 @@ def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
     :param dt:
     :return:
     """
-    RELATIVE_VALUE_THRESHOLD = 0.1
+
     dt = int(dt.replace("-", ""))
     select_sql = f"""
         SELECT account_name, read_rate_avg
-        FROM long_articles_read_rate
+        FROM {read_rate_table}
         WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
         ORDER BY dt_version DESC limit 1;
     """
@@ -235,15 +203,15 @@ def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
         account_name = result[0][0]
         previous_read_rate_avg = result[0][1]
         relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
-        if -RELATIVE_VALUE_THRESHOLD <= relative_value <= RELATIVE_VALUE_THRESHOLD:
+        if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD:
             return {}
         else:
             response = {
                 "账号名称": account_name,
                 "位置": index,
-                "当天阅读率均值": float_to_percentage(avg_rate),
-                "前一天阅读率均值": float_to_percentage(previous_read_rate_avg),
-                "相对变化率": float_to_percentage(relative_value)
+                "当天阅读率均值": Functions().float_to_percentage(avg_rate),
+                "前一天阅读率均值": Functions().float_to_percentage(previous_read_rate_avg),
+                "相对变化率": Functions().float_to_percentage(relative_value)
             }
             return response
 
@@ -257,32 +225,45 @@ def update_single_day(dt, account_list, article_df, lam):
     :param dt:
     :return:
     """
-    index_list = [1, 2, 3, 4, 5, 6, 7, 8]
     error_list = []
     insert_error_list = []
-    update_timestamp = str_to_timestamp(dt)
+    update_timestamp = functions.str_to_timestamp(date_string=dt)
+
     # 因为计算均值的时候是第二天,所以需要把时间前移一天
-    avg_date = timestamp_to_str(update_timestamp - ONE_DAY_IN_SECONDS)
+    avg_date = functions.timestamp_to_str(
+        timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS,
+        string_format='%Y-%m-%d'
+    )
+
     for account in tqdm(account_list):
-        for index in index_list:
-            avg_rate, max_time, min_time, articles_count = cal_avg_account_read_rate(article_df, account['gh_id'], index, dt)
-            if articles_count > 0:
+        for index in const.ARTICLE_INDEX_LIST:
+            read_rate_detail = cal_avg_account_read_rate(
+                df=article_df,
+                gh_id=account['gh_id'],
+                index=index,
+                dt=dt
+            )
+            read_rate_avg = read_rate_detail['read_rate_avg']
+            max_publish_time = read_rate_detail['max_publish_time']
+            min_publish_time = read_rate_detail['min_publish_time']
+            articles_count = read_rate_detail['records']
+            if articles_count:
                 if index in {1, 2}:
                     error_obj = check_each_position(
                         db_client=lam,
                         gh_id=account['gh_id'],
                         index=index,
                         dt=dt,
-                        avg_rate=avg_rate
+                        avg_rate=read_rate_avg
                     )
                     if error_obj:
                         error_list.append(error_obj)
                 # continue
                 try:
-                    if avg_rate == 0:
+                    if not read_rate_avg:
                         continue
                     insert_sql = f"""
-                        INSERT INTO long_articles_read_rate
+                        INSERT INTO {read_rate_table}
                         (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
@@ -293,11 +274,11 @@ def update_single_day(dt, account_list, article_df, lam):
                             account['account_name'],
                             account['gh_id'],
                             index,
-                            avg_rate,
+                            read_rate_avg,
                             "从 {} 开始往前计算 31  天".format(dt),
                             articles_count,
-                            timestamp_to_str(min_time),
-                            timestamp_to_str(max_time),
+                            functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'),
+                            functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'),
                             avg_date.replace("-", ""),
                             0
                         )
@@ -351,21 +332,6 @@ def main() -> None:
 
     update_single_day(dt, account_list, df, lam)
 
-    # start_dt = start_date = datetime(2024, 8, 1)
-    # end_date = datetime(2024, 10, 22)
-    # # 计算日期差
-    # delta = end_date - start_date
-    # # 生成日期字符串列表
-    # date_strings = []
-    # for i in range(delta.days + 1):
-    #     date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d'))
-    #
-    # # 打印结果
-    # date_str = '2024-09-11'
-    # date_strings = [date_str,]
-    # for date_str in tqdm(date_strings):
-    #     update_single_day(date_str, account_list, df, lam)
-
 
 if __name__ == '__main__':
     main()

+ 1 - 1
coldStartTasks/crawler/weixinCategoryCrawler.py

@@ -104,7 +104,7 @@ class weixinCategory(object):
         """
         result = self.db_client_lam.select(select_sql)
         time_stamp = result[0][0]
-        dt_str = self.function.time_stamp_to_str(time_stamp)
+        dt_str = self.function.timestamp_to_str(time_stamp)
         update_sql = f"""
             update long_articles_accounts
             set latest_update_time = %s

+ 4 - 11
updateAccountV3.py

@@ -9,17 +9,8 @@ from datetime import datetime, timedelta
 from argparse import ArgumentParser
 
 from applications import PQMySQL, DeNetMysql, longArticlesMySQL
+from applications.const import updateAccountReadAvgTaskConst
 
-TOULIU_ACCOUNTS = {
-    'gh_93e00e187787',
-    'gh_ac43e43b253b',
-    'gh_68e7fdc09fe4',
-    'gh_77f36c109fb1',
-    'gh_b181786a6c8c',
-    'gh_1ee2e1b39ccf'
-}
-ARTICLES_DAILY = 1
-TOULIU = 2
 
 def get_account_fans_by_dt(db_client) -> dict:
     """
@@ -58,6 +49,7 @@ class UpdateAccountInfoVersion3(object):
     """
 
     def __init__(self):
+        self.const = updateAccountReadAvgTaskConst()
         self.pq = PQMySQL()
         self.de = DeNetMysql()
         self.lam = longArticlesMySQL()
@@ -134,7 +126,8 @@ class UpdateAccountInfoVersion3(object):
         account_list = self.get_publishing_accounts()
         rate_dict = self.get_account_position_read_rate(dt)
         for account in tqdm(account_list):
-            business_type = TOULIU if account['gh_id'] in TOULIU_ACCOUNTS else ARTICLES_DAILY
+            business_type = self.const.TOULIU if account[
+                                                     'gh_id'] in self.const.TOULIU_ACCOUNTS else self.const.ARTICLES_DAILY
             fans = fans_dict.get(account['gh_id'], {}).get(dt, 0)
             if fans:
                 for index in range(1, 9):