|
@@ -28,6 +28,7 @@ EXPLORE1_GROUP_NAME = '3rd-party-explore1'
|
|
EXPLORE2_GROUP_NAME = '3rd-party-explore2'
|
|
EXPLORE2_GROUP_NAME = '3rd-party-explore2'
|
|
# GH_IDS will be updated by get_and_update_gh_ids
|
|
# GH_IDS will be updated by get_and_update_gh_ids
|
|
GH_IDS = ('default',)
|
|
GH_IDS = ('default',)
|
|
|
|
+account_map = {}
|
|
|
|
|
|
pd.set_option('display.max_rows', None)
|
|
pd.set_option('display.max_rows', None)
|
|
|
|
|
|
@@ -40,14 +41,22 @@ ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
|
|
GH_DETAIL = 'gh_detail'
|
|
GH_DETAIL = 'gh_detail'
|
|
RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
STATS_PERIOD_DAYS = 5
|
|
STATS_PERIOD_DAYS = 5
|
|
-SEND_N = 1
|
|
|
|
|
|
+DEFAULT_SEND_N = 1
|
|
|
|
+MAX_SEND_N = 3
|
|
|
|
+
|
|
|
|
+default_video = {
|
|
|
|
+ '泛生活': [20463342, 14095344, 13737337],
|
|
|
|
+ '泛历史': [13586800, 12794884, 12117356],
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
|
|
def get_and_update_gh_ids(run_dt):
|
|
def get_and_update_gh_ids(run_dt):
|
|
db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
|
|
db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
|
|
gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
|
|
gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
|
|
sqlstr = f"""
|
|
sqlstr = f"""
|
|
SELECT gh_id, gh_name, category1, category2, channel,
|
|
SELECT gh_id, gh_name, category1, category2, channel,
|
|
- video_ids, strategy_status
|
|
|
|
|
|
+ video_ids, strategy_status,
|
|
|
|
+ autoreply_send_minigram_num as send_n
|
|
FROM {GH_DETAIL}
|
|
FROM {GH_DETAIL}
|
|
WHERE is_delete = 0 AND `type` = {gh_type}
|
|
WHERE is_delete = 0 AND `type` = {gh_type}
|
|
"""
|
|
"""
|
|
@@ -63,6 +72,11 @@ def get_and_update_gh_ids(run_dt):
|
|
account_df = account_df.drop_duplicates(subset=['gh_id'])
|
|
account_df = account_df.drop_duplicates(subset=['gh_id'])
|
|
global GH_IDS
|
|
global GH_IDS
|
|
GH_IDS = tuple(account_df['gh_id'])
|
|
GH_IDS = tuple(account_df['gh_id'])
|
|
|
|
+ global account_map
|
|
|
|
+ account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
|
|
|
|
+ for gh_id in account_map:
|
|
|
|
+ account_info = account_map[gh_id]
|
|
|
|
+ account_info['send_n'] = account_info.get('send_n', 1)
|
|
return account_df
|
|
return account_df
|
|
|
|
|
|
|
|
|
|
@@ -134,13 +148,23 @@ def rank_for_layer1(run_dt, run_hour, project, table, gh):
|
|
df['score'] = 1.0
|
|
df['score'] = 1.0
|
|
# 按照 category1 分类后进行加权随机抽样
|
|
# 按照 category1 分类后进行加权随机抽样
|
|
sampled_df = df.groupby('category1').apply(
|
|
sampled_df = df.groupby('category1').apply(
|
|
- lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
|
|
|
|
|
|
+ lambda x: x.sample(n=MAX_SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
|
|
sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
|
|
sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
|
|
# 按得分排序
|
|
# 按得分排序
|
|
sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
|
|
sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
|
|
sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
|
|
sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
|
|
sampled_df['dt_version'] = dt_version
|
|
sampled_df['dt_version'] = dt_version
|
|
- extend_df = sampled_df.merge(gh, on='category1')
|
|
|
|
|
|
+
|
|
|
|
+ merged_dfs = []
|
|
|
|
+ for gh_id in GH_IDS:
|
|
|
|
+ sub_gh_df = gh.query(f'gh_id == "{gh_id}"')
|
|
|
|
+ account_info = account_map[gh_id]
|
|
|
|
+ send_n = account_info['send_n']
|
|
|
|
+ sub_video_df = sampled_df.query(f'sort <= {send_n}').copy()
|
|
|
|
+ merged_df = sub_video_df.merge(sub_gh_df, on='category1')
|
|
|
|
+ merged_df['sort'] = send_n + 1 - merged_df['sort']
|
|
|
|
+ merged_dfs.append(merged_df)
|
|
|
|
+ extend_df = pd.concat(merged_dfs)
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
return result_df
|
|
return result_df
|
|
|
|
|
|
@@ -160,7 +184,7 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
|
|
sampled_dfs = []
|
|
sampled_dfs = []
|
|
# 处理default逻辑(default-explore2)
|
|
# 处理default逻辑(default-explore2)
|
|
default_stats_df = stats_df.query('gh_id == "default"')
|
|
default_stats_df = stats_df.query('gh_id == "default"')
|
|
- sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
|
|
|
|
|
|
+ sampled_df = default_stats_df.sample(n=DEFAULT_SEND_N, weights=default_stats_df['score'])
|
|
sampled_df['sort'] = range(1, len(sampled_df) + 1)
|
|
sampled_df['sort'] = range(1, len(sampled_df) + 1)
|
|
sampled_dfs.append(sampled_df)
|
|
sampled_dfs.append(sampled_df)
|
|
|
|
|
|
@@ -174,15 +198,37 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
|
|
for gh_id in GH_IDS:
|
|
for gh_id in GH_IDS:
|
|
if gh_id == 'default':
|
|
if gh_id == 'default':
|
|
continue
|
|
continue
|
|
|
|
+ account_info = account_map[gh_id]
|
|
|
|
+ send_n = account_info['send_n']
|
|
sub_df = df.query(f'gh_id == "{gh_id}"')
|
|
sub_df = df.query(f'gh_id == "{gh_id}"')
|
|
- if len(sub_df) < SEND_N:
|
|
|
|
|
|
+ if len(sub_df) < send_n:
|
|
LOGGER.warning(
|
|
LOGGER.warning(
|
|
"gh_id[{}] rows[{}] not enough for layer2, fallback to base"
|
|
"gh_id[{}] rows[{}] not enough for layer2, fallback to base"
|
|
.format(gh_id, len(sub_df)))
|
|
.format(gh_id, len(sub_df)))
|
|
sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy()
|
|
sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy()
|
|
|
|
+ if len(sub_df) < send_n:
|
|
|
|
+ LOGGER.warning(
|
|
|
|
+ "gh_id[{}] rows[{}] still not enough for layer2, add backup"
|
|
|
|
+ .format(gh_id, len(sub_df)))
|
|
|
|
+ rows = []
|
|
|
|
+ idx = len(sub_df)
|
|
|
|
+ exist_video_ids = sub_df['video_id'].unique()
|
|
|
|
+ for video_id in default_video[account_info['category1']]:
|
|
|
|
+ if video_id in exist_video_ids:
|
|
|
|
+ continue
|
|
|
|
+ row = {
|
|
|
|
+ 'gh_id': gh_id,
|
|
|
|
+ 'sort': idx + 1,
|
|
|
|
+ 'video_id': video_id,
|
|
|
|
+ 'strategy_key': '' # this is not important
|
|
|
|
+ }
|
|
|
|
+ rows.append(row)
|
|
|
|
+ appx_df = pd.DataFrame(rows)
|
|
|
|
+ sub_df = pd.concat([sub_df, appx_df])
|
|
sub_df['score'] = sub_df['sort']
|
|
sub_df['score'] = sub_df['sort']
|
|
- sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
|
|
|
|
- sampled_df['sort'] = range(1, len(sampled_df) + 1)
|
|
|
|
|
|
+
|
|
|
|
+ sampled_df = sub_df.sample(n=send_n, weights=sub_df['score'])
|
|
|
|
+ sampled_df['sort'] = range(send_n, 0, -1)
|
|
sampled_dfs.append(sampled_df)
|
|
sampled_dfs.append(sampled_df)
|
|
|
|
|
|
extend_df = pd.concat(sampled_dfs)
|
|
extend_df = pd.concat(sampled_dfs)
|
|
@@ -227,7 +273,37 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
|
|
top_n['sort'] = range(1, len(top_n) + 1)
|
|
top_n['sort'] = range(1, len(top_n) + 1)
|
|
return top_n
|
|
return top_n
|
|
|
|
|
|
- ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
|
|
|
|
|
|
+ ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, MAX_SEND_N)
|
|
|
|
+ sampled_dfs = []
|
|
|
|
+ for gh_id in GH_IDS:
|
|
|
|
+ account_info = account_map[gh_id]
|
|
|
|
+ send_n = account_info['send_n']
|
|
|
|
+ sub_df = ranked_df.query(f'gh_id == "{gh_id}" and sort <= {send_n}').copy()
|
|
|
|
+ if len(sub_df) < send_n:
|
|
|
|
+ LOGGER.warning(
|
|
|
|
+ "gh_id[{}] rows[{}] still not enough for base, add backup"
|
|
|
|
+ .format(gh_id, len(sub_df)))
|
|
|
|
+ rows = []
|
|
|
|
+ idx = len(sub_df)
|
|
|
|
+ exist_video_ids = sub_df['video_id'].unique()
|
|
|
|
+ for video_id in default_video[account_info['category1']]:
|
|
|
|
+ if video_id in exist_video_ids:
|
|
|
|
+ continue
|
|
|
|
+ row = {
|
|
|
|
+ 'gh_id': gh_id,
|
|
|
|
+ 'sort': idx + 1,
|
|
|
|
+ 'video_id': video_id,
|
|
|
|
+ 'score': 0.0,
|
|
|
|
+ 'strategy_key': '' # this is not important
|
|
|
|
+ }
|
|
|
|
+ rows.append(row)
|
|
|
|
+ if len(sub_df) + len(rows) >= send_n:
|
|
|
|
+ break
|
|
|
|
+ appx_df = pd.DataFrame(rows)
|
|
|
|
+ sub_df = pd.concat([sub_df, appx_df])
|
|
|
|
+ sub_df['sort'] = send_n + 1 - sub_df['sort']
|
|
|
|
+ sampled_dfs.append(sub_df)
|
|
|
|
+ ranked_df = pd.concat(sampled_dfs)
|
|
ranked_df = ranked_df.reset_index(drop=True)
|
|
ranked_df = ranked_df.reset_index(drop=True)
|
|
ranked_df['strategy_key'] = stg_key
|
|
ranked_df['strategy_key'] = stg_key
|
|
ranked_df['dt_version'] = dt_version
|
|
ranked_df['dt_version'] = dt_version
|
|
@@ -238,10 +314,11 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
|
|
def check_result_data(df):
|
|
def check_result_data(df):
|
|
check_unsafe_video(df)
|
|
check_unsafe_video(df)
|
|
for gh_id in GH_IDS:
|
|
for gh_id in GH_IDS:
|
|
|
|
+ account_info = account_map[gh_id]
|
|
for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
|
|
for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
|
|
sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
|
|
sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
|
|
n_records = len(sub_df)
|
|
n_records = len(sub_df)
|
|
- if n_records != SEND_N:
|
|
|
|
|
|
+ if n_records != account_info['send_n']:
|
|
raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
|
|
raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
|
|
|
|
|
|
|
|
|
|
@@ -256,11 +333,12 @@ def postprocess_override_by_config(df, gh_df, dt_version):
|
|
|
|
|
|
for row in override_config:
|
|
for row in override_config:
|
|
gh_id = row['gh_id']
|
|
gh_id = row['gh_id']
|
|
|
|
+ account_info = account_map[gh_id]
|
|
try:
|
|
try:
|
|
video_ids = json.loads(row['video_ids'])
|
|
video_ids = json.loads(row['video_ids'])
|
|
if not isinstance(video_ids, list):
|
|
if not isinstance(video_ids, list):
|
|
raise Exception("video_ids is not list")
|
|
raise Exception("video_ids is not list")
|
|
- video_ids = video_ids[:SEND_N]
|
|
|
|
|
|
+ video_ids = video_ids[:account_info['send_n']]
|
|
except Exception as e:
|
|
except Exception as e:
|
|
LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
|
|
LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
|
|
continue
|
|
continue
|
|
@@ -294,9 +372,6 @@ def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
|
|
|
|
|
|
final_df = join_video_info(final_rank_df)
|
|
final_df = join_video_info(final_rank_df)
|
|
|
|
|
|
- # reverse sending order
|
|
|
|
- final_df['sort'] = SEND_N + 1 - final_df['sort']
|
|
|
|
-
|
|
|
|
if dry_run:
|
|
if dry_run:
|
|
print("==== ALL DATA ====")
|
|
print("==== ALL DATA ====")
|
|
print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
|
|
print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
|
|
@@ -346,7 +421,6 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
|
|
# 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
|
|
# 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
|
|
last_strategy, last_dt_version = get_last_strategy_result(
|
|
last_strategy, last_dt_version = get_last_strategy_result(
|
|
ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
|
|
ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
|
|
- account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
|
|
|
|
all_accounts = account_df['gh_id'].unique()
|
|
all_accounts = account_df['gh_id'].unique()
|
|
accounts_in_strategy = last_strategy['gh_id'].unique()
|
|
accounts_in_strategy = last_strategy['gh_id'].unique()
|
|
delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
|
|
delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
|
|
@@ -357,11 +431,6 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
|
|
LOGGER.info('Found 0 new account, do nothing.')
|
|
LOGGER.info('Found 0 new account, do nothing.')
|
|
return
|
|
return
|
|
|
|
|
|
- default_video = {
|
|
|
|
- '泛生活': [20463342],
|
|
|
|
- '泛历史': [13586800],
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
# 新增账号,不存在历史,可直接忽略strategy_status字段
|
|
# 新增账号,不存在历史,可直接忽略strategy_status字段
|
|
# TODO: set default by history stats
|
|
# TODO: set default by history stats
|
|
groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
|
|
groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
|
|
@@ -375,7 +444,7 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
|
|
else:
|
|
else:
|
|
video_ids = default_video[account_info['category1']]
|
|
video_ids = default_video[account_info['category1']]
|
|
for group_key in groups:
|
|
for group_key in groups:
|
|
- for idx in range(SEND_N):
|
|
|
|
|
|
+ for idx in range(account_info['send_n']):
|
|
row = {
|
|
row = {
|
|
'strategy_key': group_key,
|
|
'strategy_key': group_key,
|
|
'gh_id': gh_id,
|
|
'gh_id': gh_id,
|