|
@@ -0,0 +1,203 @@
|
|
|
+import time, json
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from pandas import DataFrame
|
|
|
+
|
|
|
+from .models import CategoryRegression
|
|
|
+
|
|
|
+
|
|
|
+class AccountCategoryConst:
|
|
|
+ P_VALUE_THRESHOLD = 0.15
|
|
|
+ SAMPLE_MIN_SIZE = 5
|
|
|
+ POSITIVE_STATUS = 1
|
|
|
+ NEGATIVE_STATUS = 0
|
|
|
+ VERSION = 2
|
|
|
+
|
|
|
+ # SOME THRESHOLDS
|
|
|
+ SIMILARITY_THRESHOLD = 0
|
|
|
+ READ_AVG_THRESHOLD = 500
|
|
|
+ READ_AVG_RATE_MIN = 0.3
|
|
|
+ READ_AVG_RATE_MAX = 3
|
|
|
+ VIEW_COUNT_RATE_THRESHOLD = 0
|
|
|
+ INDEX_MAX = 3
|
|
|
+
|
|
|
+
|
|
|
+class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
|
|
|
+ def __init__(self, pool, log_client, trace_id, date_string, data):
|
|
|
+ self.account_negative_categories = None
|
|
|
+ self.pool = pool
|
|
|
+ self.log_client = log_client
|
|
|
+ self.trace_id = trace_id
|
|
|
+ features = data.get("features")
|
|
|
+ category_map = data.get("category_map")
|
|
|
+ super().__init__(features, category_map)
|
|
|
+ self.date_string = date_string
|
|
|
+ self.view_only = data.get("view_only")
|
|
|
+
|
|
|
+ def reverse_category_map(self):
|
|
|
+ return {v: k for k, v in self.category_map.items()}
|
|
|
+
|
|
|
+ def init_execute_date(self):
|
|
|
+ if self.date_string:
|
|
|
+ run_date = datetime.strptime(self.date_string, "%Y-%m-%d")
|
|
|
+ else:
|
|
|
+ run_date = datetime.today()
|
|
|
+
|
|
|
+ end_dt = (run_date - timedelta(1)).strftime("%Y%m%d")
|
|
|
+ return end_dt
|
|
|
+
|
|
|
+ async def prepare_raw_data(self, end_dt, begin_dt: str = "20250401"):
|
|
|
+ query = """
|
|
|
+ SELECT dt, gh_id, account_name, title, similarity, view_count_rate, category,
|
|
|
+ read_avg, read_avg_rate, first_pub_interval, `index`
|
|
|
+ FROM datastat_score
|
|
|
+ WHERE dt BETWEEN %s AND %s
|
|
|
+ AND similarity > %s
|
|
|
+ AND category IS NOT NULL
|
|
|
+ AND read_avg > %s
|
|
|
+ AND read_avg_rate BETWEEN %s AND %s
|
|
|
+ AND view_count_rate > %s
|
|
|
+ AND `index` < %s;
|
|
|
+ ;
|
|
|
+ """
|
|
|
+ fetch_response = await self.pool.async_fetch(
|
|
|
+ query=query,
|
|
|
+ params=(
|
|
|
+ begin_dt,
|
|
|
+ end_dt,
|
|
|
+ self.SIMILARITY_THRESHOLD,
|
|
|
+ self.READ_AVG_THRESHOLD,
|
|
|
+ self.READ_AVG_RATE_MIN,
|
|
|
+ self.READ_AVG_RATE_MAX,
|
|
|
+ self.VIEW_COUNT_RATE_THRESHOLD,
|
|
|
+ self.INDEX_MAX,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ dataframe = DataFrame.from_records(
|
|
|
+ fetch_response,
|
|
|
+ columns=[
|
|
|
+ "dt",
|
|
|
+ "gh_id",
|
|
|
+ "account_name",
|
|
|
+ "title",
|
|
|
+ "similarity",
|
|
|
+ "view_count_rate",
|
|
|
+ "category",
|
|
|
+ "read_avg",
|
|
|
+ "read_avg_rate",
|
|
|
+ "first_pub_interval",
|
|
|
+ "index",
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ dataframe = dataframe.drop_duplicates(["dt", "gh_id", "title"])
|
|
|
+ return dataframe
|
|
|
+
|
|
|
+ async def update_each_account(self, record):
|
|
|
+ now_timestamp = int(time.time())
|
|
|
+ query = """
|
|
|
+ insert into account_category (dt, gh_id, category_map, status, version, create_timestamp, update_timestamp)
|
|
|
+ values (%s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """
|
|
|
+ insert_rows = await self.pool.async_save(
|
|
|
+ query=query,
|
|
|
+ params=(
|
|
|
+ record["dt"],
|
|
|
+ record["gh_id"],
|
|
|
+ record["category_map"],
|
|
|
+ self.POSITIVE_STATUS,
|
|
|
+ self.VERSION,
|
|
|
+ now_timestamp,
|
|
|
+ now_timestamp,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ if insert_rows:
|
|
|
+ update_query = """
|
|
|
+ update account_category set status = %s, update_timestamp = %s
|
|
|
+ where gh_id = %s and dt < %s and status = %s and version = %s;
|
|
|
+ """
|
|
|
+ await self.pool.async_save(
|
|
|
+ query=update_query,
|
|
|
+ params=(
|
|
|
+ self.NEGATIVE_STATUS,
|
|
|
+ now_timestamp,
|
|
|
+ record["gh_id"],
|
|
|
+ record["dt"],
|
|
|
+ self.POSITIVE_STATUS,
|
|
|
+ self.VERSION,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
+ async def predict_each_account(
|
|
|
+ self, df, account_id, account_id_map, end_dt, param_to_category_map
|
|
|
+ ):
|
|
|
+ sub_df = df[df["gh_id"] == account_id]
|
|
|
+ account_name = account_id_map[account_id]
|
|
|
+ sample_count = sub_df.shape[0]
|
|
|
+ if sample_count < self.SAMPLE_MIN_SIZE:
|
|
|
+ return
|
|
|
+ print(sub_df.columns.values.tolist())
|
|
|
+ for line in sub_df.values.tolist():
|
|
|
+ print(line)
|
|
|
+
|
|
|
+ # params, t_stats, p_values = self.run_ols_linear_regression(
|
|
|
+ # sub_df, self.view_only, self.P_VALUE_THRESHOLD
|
|
|
+ # )
|
|
|
+ # current_record = {"dt": end_dt, "gh_id": account_id, "category_map": {}, "name": account_name}
|
|
|
+ # params_names = self.get_param_names()
|
|
|
+ # for name, param, p_value in zip(params_names, params, p_values):
|
|
|
+ # category_name = param_to_category_map.get(name, None)
|
|
|
+ # if (
|
|
|
+ # abs(param) > 0.1
|
|
|
+ # and p_value < self.P_VALUE_THRESHOLD
|
|
|
+ # and category_name is not None
|
|
|
+ # ):
|
|
|
+ # scale_factor = min(0.1 / p_value, 1)
|
|
|
+ # print(
|
|
|
+ # f"{account_id} {account_name} {category_name} {param:.3f} {p_value:.3f}"
|
|
|
+ # )
|
|
|
+ # truncate_param = round(max(min(param, 0.25), -0.25) * scale_factor, 6)
|
|
|
+ # current_record["category_map"][category_name] = truncate_param
|
|
|
+ #
|
|
|
+ # if (
|
|
|
+ # param < -0.1
|
|
|
+ # and p_value < self.P_VALUE_THRESHOLD
|
|
|
+ # and category_name is not None
|
|
|
+ # ):
|
|
|
+ # self.account_negative_categories[account_id].append(category_name)
|
|
|
+ #
|
|
|
+ # if not current_record["category_map"]:
|
|
|
+ # return
|
|
|
+ #
|
|
|
+ # current_record["category_map"] = json.dumps(
|
|
|
+ # current_record["category_map"], ensure_ascii=False
|
|
|
+ # )
|
|
|
+
|
|
|
+ # await self.update_each_account(current_record)
|
|
|
+
|
|
|
+ async def deal(self):
|
|
|
+ end_dt = self.init_execute_date()
|
|
|
+ raw_dataframe = await self.prepare_raw_data(end_dt)
|
|
|
+
|
|
|
+ # prepare data for model
|
|
|
+ pre_processed_dataframe = self.preprocess_data(raw_dataframe)
|
|
|
+
|
|
|
+ if self.view_only:
|
|
|
+ self.build_and_print_matrix(pre_processed_dataframe)
|
|
|
+ return
|
|
|
+
|
|
|
+ param_to_category_map = self.reverse_category_map()
|
|
|
+
|
|
|
+ account_ids = pre_processed_dataframe["gh_id"].unique()
|
|
|
+ account_id_map = (
|
|
|
+ pre_processed_dataframe[["account_name", "gh_id"]]
|
|
|
+ .drop_duplicates()
|
|
|
+ .set_index("gh_id")["account_name"]
|
|
|
+ .to_dict()
|
|
|
+ )
|
|
|
+
|
|
|
+ account_negative_categories = {key: [] for key in account_ids}
|
|
|
+ self.account_negative_categories = account_negative_categories
|
|
|
+
|
|
|
+ for account_id in account_ids[:1]:
|
|
|
+ await self.predict_each_account(
|
|
|
+ pre_processed_dataframe, account_id, account_id_map, end_dt, param_to_category_map
|
|
|
+ )
|