|
@@ -4,12 +4,12 @@ import os.path
|
|
|
from collections import OrderedDict
|
|
|
|
|
|
import pandas as pd
|
|
|
-import numpy as np
|
|
|
from hdfs import InsecureClient
|
|
|
|
|
|
client = InsecureClient("http://master-1-1.c-7f31a3eea195cb73.cn-hangzhou.emr.aliyuncs.com:9870", user="spark")
|
|
|
|
|
|
-SEGMENT_BASE_PATH = "/dw/recommend/model/36_score_calibration"
|
|
|
+SEGMENT_BASE_PATH = "/dw/recommend/model/36_score_calibration_file"
|
|
|
+
|
|
|
|
|
|
def read_predict_from_local_txt(txt_file) -> list:
|
|
|
result = []
|
|
@@ -38,15 +38,16 @@ def read_predict_from_hdfs(hdfs_path: str) -> list:
|
|
|
for line in gz_file.read().decode("utf-8").split("\n"):
|
|
|
split = line.split("\t")
|
|
|
if len(split) == 4:
|
|
|
- cid = split[3].split("_")[0]
|
|
|
- label = int(split[0])
|
|
|
- score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
|
|
|
+ continue
|
|
|
+ cid = split[3].split("_")[0]
|
|
|
+ label = int(split[0])
|
|
|
+ score = float(split[2].replace("[", "").replace("]", "").split(",")[1])
|
|
|
|
|
|
- result.append({
|
|
|
- "cid": cid,
|
|
|
- "label": label,
|
|
|
- "score": score
|
|
|
- })
|
|
|
+ result.append({
|
|
|
+ "cid": cid,
|
|
|
+ "label": label,
|
|
|
+ "score": score
|
|
|
+ })
|
|
|
|
|
|
return result
|
|
|
|
|
@@ -85,7 +86,9 @@ def segment_calc_diff_rate_by_score(df: pd.DataFrame, segment_file_path: str, st
|
|
|
group_df['segment_diff_rate'] = (group_df['segment_score_avg'] / group_df['segment_true_score'] - 1).mask(group_df['segment_true_score'] == 0, 0)
|
|
|
|
|
|
# 完整的分段文件保存
|
|
|
- group_df.to_csv(segment_file_path, sep="\t", index=False)
|
|
|
+ csv_data = group_df.to_csv(sep="\t", index=False)
|
|
|
+ with client.write(segment_file_path, encoding='utf08') as writer:
|
|
|
+ writer.write(csv_data)
|
|
|
|
|
|
filtered_df = group_df[(abs(group_df['segment_diff_rate']) >= 0.2) & (group_df['segment_label_cnt'] >= 1000)]
|
|
|
filtered_df = filtered_df[['score_segment', 'segment_diff_rate']]
|
|
@@ -129,8 +132,8 @@ def read_and_calibration_predict(predict_path: str, is_hdfs=True, step=100) -> [
|
|
|
|
|
|
|
|
|
def _main(old_predict_path: str, new_predict_path: str, calibration_file: str, analyse_file: str):
|
|
|
- old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path, is_hdfs=True)
|
|
|
- new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path, is_hdfs=True)
|
|
|
+ old_group_df, old_segment_df = read_and_calibration_predict(old_predict_path, is_hdfs=False)
|
|
|
+ new_group_df, new_segment_df = read_and_calibration_predict(new_predict_path, is_hdfs=False)
|
|
|
|
|
|
# 分段文件保存, 此处保留的最后使用的分段文件,不是所有的分段
|
|
|
new_segment_df.to_csv(calibration_file, sep='\t', index=False, header=False)
|