|
@@ -16,7 +16,7 @@ from log import Log
|
|
import os
|
|
import os
|
|
from argparse import ArgumentParser
|
|
from argparse import ArgumentParser
|
|
from constants import AutoReplyAccountType
|
|
from constants import AutoReplyAccountType
|
|
-from alg_growth_common import check_unsafe_video, filter_unsafe_video
|
|
|
|
|
|
+from alg_growth_common import check_unsafe_video, filter_unsafe_video, filter_audit_failed_video
|
|
|
|
|
|
CONFIG, _ = set_config()
|
|
CONFIG, _ = set_config()
|
|
LOGGER = Log()
|
|
LOGGER = Log()
|
|
@@ -41,6 +41,7 @@ SEND_N = 2
|
|
|
|
|
|
pd.set_option('display.max_rows', None)
|
|
pd.set_option('display.max_rows', None)
|
|
|
|
|
|
|
|
+
|
|
def get_and_update_gh_ids(run_dt):
|
|
def get_and_update_gh_ids(run_dt):
|
|
gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
|
|
gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
|
|
gh = gh.to_pandas()
|
|
gh = gh.to_pandas()
|
|
@@ -73,7 +74,7 @@ def check_data_partition(project, table, data_dt, data_hr=None):
|
|
|
|
|
|
def get_last_strategy_result(project, rank_table, dt_version, key):
|
|
def get_last_strategy_result(project, rank_table, dt_version, key):
|
|
strategy_df = get_odps_df_of_max_partition(
|
|
strategy_df = get_odps_df_of_max_partition(
|
|
- project, rank_table, { 'ctime': dt_version }
|
|
|
|
|
|
+ project, rank_table, {'ctime': dt_version}
|
|
).to_pandas()
|
|
).to_pandas()
|
|
sub_df = strategy_df.query(f'strategy_key == "{key}"')
|
|
sub_df = strategy_df.query(f'strategy_key == "{key}"')
|
|
sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
|
|
sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
|
|
@@ -165,6 +166,7 @@ def rank_for_layer1(run_dt, run_hour, project, table, gh_df):
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
return result_df
|
|
return result_df
|
|
|
|
|
|
|
|
+
|
|
def rank_for_layer2(run_dt, run_hour, rank_table):
|
|
def rank_for_layer2(run_dt, run_hour, rank_table):
|
|
stats_df = process_reply_stats(
|
|
stats_df = process_reply_stats(
|
|
ODS_PROJECT, GH_REPLY_STATS_TABLE, GH_REPLY_STATS_HOUR_TABLE,
|
|
ODS_PROJECT, GH_REPLY_STATS_TABLE, GH_REPLY_STATS_HOUR_TABLE,
|
|
@@ -172,7 +174,7 @@ def rank_for_layer2(run_dt, run_hour, rank_table):
|
|
|
|
|
|
# 确保重跑时可获得一致结果
|
|
# 确保重跑时可获得一致结果
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
- np.random.seed(int(dt_version)+1)
|
|
|
|
|
|
+ np.random.seed(int(dt_version) + 1)
|
|
# TODO: 计算账号间相关性
|
|
# TODO: 计算账号间相关性
|
|
## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
|
|
## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
|
|
## 求向量相关系数或cosine相似度
|
|
## 求向量相关系数或cosine相似度
|
|
@@ -188,10 +190,12 @@ def rank_for_layer2(run_dt, run_hour, rank_table):
|
|
|
|
|
|
# 基础过滤for账号
|
|
# 基础过滤for账号
|
|
df = stats_df.query('day0_return > 100')
|
|
df = stats_df.query('day0_return > 100')
|
|
-
|
|
|
|
|
|
+ # 目标账号失效视频过滤
|
|
|
|
+ df = filter_audit_failed_video(df)
|
|
# fallback to base if necessary
|
|
# fallback to base if necessary
|
|
base_strategy_df = get_last_strategy_result(
|
|
base_strategy_df = get_last_strategy_result(
|
|
ODS_PROJECT, rank_table, dt_version, BASE_GROUP_NAME)
|
|
ODS_PROJECT, rank_table, dt_version, BASE_GROUP_NAME)
|
|
|
|
+ base_strategy_df = filter_audit_failed_video(base_strategy_df)
|
|
for gh_id in GH_IDS:
|
|
for gh_id in GH_IDS:
|
|
if gh_id == 'default':
|
|
if gh_id == 'default':
|
|
continue
|
|
continue
|
|
@@ -212,12 +216,13 @@ def rank_for_layer2(run_dt, run_hour, rank_table):
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
return result_df
|
|
return result_df
|
|
|
|
|
|
|
|
+
|
|
def rank_for_base(run_dt, run_hour, rank_table):
|
|
def rank_for_base(run_dt, run_hour, rank_table):
|
|
stats_df = process_reply_stats(
|
|
stats_df = process_reply_stats(
|
|
ODS_PROJECT, GH_REPLY_STATS_TABLE, GH_REPLY_STATS_HOUR_TABLE,
|
|
ODS_PROJECT, GH_REPLY_STATS_TABLE, GH_REPLY_STATS_HOUR_TABLE,
|
|
STATS_PERIOD_DAYS, run_dt, run_hour)
|
|
STATS_PERIOD_DAYS, run_dt, run_hour)
|
|
|
|
|
|
- #TODO: support to set base manually
|
|
|
|
|
|
+ # TODO: support to set base manually
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
|
|
|
|
# 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
|
|
# 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
|
|
@@ -234,22 +239,23 @@ def rank_for_base(run_dt, run_hour, rank_table):
|
|
|
|
|
|
stats_with_strategy_df = stats_df \
|
|
stats_with_strategy_df = stats_df \
|
|
.merge(
|
|
.merge(
|
|
- base_strategy_df,
|
|
|
|
- on=['gh_id', 'video_id'],
|
|
|
|
- how='left') \
|
|
|
|
|
|
+ base_strategy_df,
|
|
|
|
+ on=['gh_id', 'video_id'],
|
|
|
|
+ how='left') \
|
|
.query('strategy_key.notna() or score > 0.1')
|
|
.query('strategy_key.notna() or score > 0.1')
|
|
|
|
|
|
# 合并default和分账号数据
|
|
# 合并default和分账号数据
|
|
grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
|
|
grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
|
|
-
|
|
|
|
|
|
+ grouped_stats_df = filter_audit_failed_video(grouped_stats_df)
|
|
def set_top_n(group, n=2):
|
|
def set_top_n(group, n=2):
|
|
group_sorted = group.sort_values(by='score', ascending=False)
|
|
group_sorted = group.sort_values(by='score', ascending=False)
|
|
top_n = group_sorted.head(n)
|
|
top_n = group_sorted.head(n)
|
|
top_n['sort'] = range(1, n + 1)
|
|
top_n['sort'] = range(1, n + 1)
|
|
return top_n
|
|
return top_n
|
|
|
|
+
|
|
ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
|
|
ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
|
|
ranked_df = ranked_df.reset_index(drop=True)
|
|
ranked_df = ranked_df.reset_index(drop=True)
|
|
- #ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
|
|
|
|
|
|
+ # ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
|
|
ranked_df['strategy_key'] = BASE_GROUP_NAME
|
|
ranked_df['strategy_key'] = BASE_GROUP_NAME
|
|
ranked_df['dt_version'] = dt_version
|
|
ranked_df['dt_version'] = dt_version
|
|
ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
@@ -273,7 +279,7 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
gh_df = get_and_update_gh_ids(next_dt)
|
|
gh_df = get_and_update_gh_ids(next_dt)
|
|
|
|
|
|
layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
|
|
layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
|
|
- layer2_rank = rank_for_layer2( run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
|
|
|
|
|
|
+ layer2_rank = rank_for_layer2(run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
|
|
base_rank = rank_for_base(run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
|
|
base_rank = rank_for_base(run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
|
|
final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
check_result_data(final_rank_df)
|
|
check_result_data(final_rank_df)
|
|
@@ -314,7 +320,7 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
def main_loop():
|
|
def main_loop():
|
|
argparser = ArgumentParser()
|
|
argparser = ArgumentParser()
|
|
argparser.add_argument('-n', '--dry-run', action='store_true')
|
|
argparser.add_argument('-n', '--dry-run', action='store_true')
|
|
- argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
|
|
|
|
|
|
+ argparser.add_argument('--run-at', help='assume to run at date and hour, yyyyMMddHH')
|
|
args = argparser.parse_args()
|
|
args = argparser.parse_args()
|
|
|
|
|
|
run_date = datetime.today()
|
|
run_date = datetime.today()
|