|
@@ -114,7 +114,7 @@ def fetch_feature_data_dt(dt: str, index):
|
|
|
:return:
|
|
|
"""
|
|
|
|
|
|
- logger.info(f"开始处理 videoid_vov_base_data -- {dt} 的数据")
|
|
|
+ logger.info(f"fetch_feature_data_dt.dt -- {dt} 的数据")
|
|
|
|
|
|
df = get_partition_df("videoid_vov_base_data", dt).fillna(0)
|
|
|
|
|
@@ -164,23 +164,34 @@ def fetch_feature_data(t_1_datetime: datetime):
|
|
|
:return:
|
|
|
"""
|
|
|
|
|
|
- logger.info(f"fetch_feature_data.label_datetime: {t_1_datetime.strftime('%Y%m%d')}")
|
|
|
-
|
|
|
with concurrent.futures.ThreadPoolExecutor(5) as executor:
|
|
|
t_1_feature_task = executor.submit(
|
|
|
fetch_feature_data_dt, t_1_datetime.strftime("%Y%m%d"), 1
|
|
|
)
|
|
|
+ t_2_datetime = t_1_datetime - timedelta(days=1)
|
|
|
t_2_feature_task = executor.submit(
|
|
|
- fetch_feature_data_dt, (t_1_datetime - timedelta(days=1)).strftime("%Y%m%d"), 2
|
|
|
+ fetch_feature_data_dt, t_2_datetime.strftime("%Y%m%d"), 2
|
|
|
)
|
|
|
+ t_3_datetime = t_1_datetime - timedelta(days=2)
|
|
|
t_3_feature_task = executor.submit(
|
|
|
- fetch_feature_data_dt, (t_1_datetime - timedelta(days=2)).strftime("%Y%m%d"), 3
|
|
|
+ fetch_feature_data_dt, t_3_datetime.strftime("%Y%m%d"), 3
|
|
|
)
|
|
|
+ t_4_datetime = t_1_datetime - timedelta(days=3)
|
|
|
t_4_feature_task = executor.submit(
|
|
|
- fetch_feature_data_dt, (t_1_datetime - timedelta(days=3)).strftime("%Y%m%d"), 4
|
|
|
+ fetch_feature_data_dt, t_4_datetime.strftime("%Y%m%d"), 4
|
|
|
)
|
|
|
+ t_5_datetime = t_1_datetime - timedelta(days=4)
|
|
|
t_5_feature_task = executor.submit(
|
|
|
- fetch_feature_data_dt, (t_1_datetime - timedelta(days=4)).strftime("%Y%m%d"), 5
|
|
|
+ fetch_feature_data_dt, t_5_datetime.strftime("%Y%m%d"), 5
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"fetch_feature_data:"
|
|
|
+ f"\n\t t_1_feature_task.datetime: {t_1_datetime.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t t_2_feature_task.datetime: {t_2_datetime.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t t_3_feature_task.datetime: {t_3_datetime.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t t_4_feature_task.datetime: {t_4_datetime.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t t_5_feature_task.datetime: {t_5_datetime.strftime('%Y%m%d')}"
|
|
|
)
|
|
|
|
|
|
t_1_feature = t_1_feature_task.result()
|
|
@@ -239,9 +250,9 @@ def xgb_train_multi_dt_data(t_1_label_dt: datetime):
|
|
|
t_1_feature_dt = t_1_label_dt - timedelta(1)
|
|
|
logger.info(
|
|
|
f"VOV模型特征数据处理 --- t_1_label_future:"
|
|
|
- f"label_datetime: {t_1_label_dt.strftime('%Y%m%d')}"
|
|
|
- f"feature_datetime: {t_1_feature_dt.strftime('%Y%m%d')}"
|
|
|
- f"view_rate_datetime: {t_1_label_dt.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t label_datetime: {t_1_label_dt.strftime('%Y%m%d')} "
|
|
|
+ f"\n\t feature_datetime: {t_1_feature_dt.strftime('%Y%m%d')} "
|
|
|
+ f"\n\t view_rate_datetime: {t_1_label_dt.strftime('%Y%m%d')} "
|
|
|
)
|
|
|
t_1_label_future = executor.submit(fetch_data, t_1_label_dt, t_1_feature_dt, t_1_label_dt)
|
|
|
|
|
@@ -249,9 +260,9 @@ def xgb_train_multi_dt_data(t_1_label_dt: datetime):
|
|
|
t_2_feature_dt = t_2_label_dt - timedelta(1)
|
|
|
logger.info(
|
|
|
f"VOV模型特征数据处理 --- t_2_label_future:"
|
|
|
- f"label_datetime: {t_2_label_dt.strftime('%Y%m%d')}"
|
|
|
- f"feature_datetime: {t_2_feature_dt.strftime('%Y%m%d')}"
|
|
|
- f"view_rate_datetime: {t_2_label_dt.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t label_datetime: {t_2_label_dt.strftime('%Y%m%d')} "
|
|
|
+ f"\n\t feature_datetime: {t_2_feature_dt.strftime('%Y%m%d')} "
|
|
|
+ f"\n\t view_rate_datetime: {t_2_label_dt.strftime('%Y%m%d')} "
|
|
|
)
|
|
|
t_2_label_future = executor.submit(fetch_data, t_2_label_dt, t_2_feature_dt, t_2_label_dt)
|
|
|
|
|
@@ -259,9 +270,9 @@ def xgb_train_multi_dt_data(t_1_label_dt: datetime):
|
|
|
t_3_feature_dt = t_3_label_dt - timedelta(1)
|
|
|
logger.info(
|
|
|
f"VOV模型特征数据处理 --- t_3_label_future:"
|
|
|
- f"label_datetime: {t_3_label_dt.strftime('%Y%m%d')}"
|
|
|
- f"feature_datetime: {t_3_feature_dt.strftime('%Y%m%d')}"
|
|
|
- f"view_rate_datetime: {t_3_label_dt.strftime('%Y%m%d')}"
|
|
|
+ f"\n\t label_datetime: {t_3_label_dt.strftime('%Y%m%d')} "
|
|
|
+ f"\n\t feature_datetime: {t_3_feature_dt.strftime('%Y%m%d')} "
|
|
|
+ f"\n\t view_rate_datetime: {t_3_label_dt.strftime('%Y%m%d')} "
|
|
|
)
|
|
|
t_3_label_future = executor.submit(fetch_data, t_3_label_dt, t_3_feature_dt, t_3_label_dt)
|
|
|
|
|
@@ -281,9 +292,9 @@ def xgb_predict_dt_data(label_datetime: datetime):
|
|
|
feature_start_datetime = label_datetime
|
|
|
logger.info(
|
|
|
f"VOV模型预测数据处理 --- predict_df: "
|
|
|
- f"label_datetime: {label_datetime.strftime('%Y%m%d')}"
|
|
|
- f"feature_datetime: {feature_start_datetime.strftime('%Y%m%d')}"
|
|
|
- f"view_rate_datetime: {label_datetime.strftime('%Y%m%d')}"
|
|
|
+ f"label_datetime: {label_datetime.strftime('%Y%m%d')} "
|
|
|
+ f"feature_datetime: {feature_start_datetime.strftime('%Y%m%d')} "
|
|
|
+ f"view_rate_datetime: {label_datetime.strftime('%Y%m%d')} "
|
|
|
)
|
|
|
return fetch_data(label_datetime, feature_start_datetime, label_datetime)
|
|
|
|