""" @author: luojunhui """ import json import os from applications.log import logging from applications.functions.date import generate_daily_strings, five_days_before def read_single_file(filename): """ :param filename: """ with open(filename, encoding="utf-8") as f: data = json.loads(f.read()) if data: return data else: return {} def compute_similarity(file_1, file_2): """ 计算 :param file_1: :param file_2: :return: """ data_1 = read_single_file(file_1) data_2 = read_single_file(file_2) def calculate_v1(d1, d2): """ 通过交并集来判断 :param d1: :param d2: :return: """ f1_keys = set(d1["key_words"]) f2_keys = set(d2["key_words"]) keys_union = f1_keys | f2_keys keys_intersection = f1_keys & f2_keys f1_search_keys = set(d1["search_keys"]) f2_search_keys = set(d2["search_keys"]) search_keys_union = f1_search_keys | f2_search_keys search_keys_intersection = f1_search_keys & f2_search_keys f1_extra_keys = set(d1["extra_keys"]) f2_extra_keys = set(d2["extra_keys"]) extra_keys_union = f1_extra_keys | f2_extra_keys extra_keys_intersection = f1_extra_keys & f2_extra_keys score_1 = len(keys_intersection) / len(keys_union) score_2 = len(search_keys_intersection) / len(search_keys_union) score_3 = len(extra_keys_intersection) / len(extra_keys_union) return score_1 * 0.4 + score_2 * 0.4 + score_3 * 0.2 def calculate_v2(d1, d2): """ 计算方法 v2 :param d1: :param d2: :return: """ score = 0 tone_1 = d1["tone"] tone_2 = d2["tone"] if tone_1 == tone_2: score += 0.1 target_audience_1 = d1["target_audience"] target_audience_2 = d2["target_audience"] if target_audience_1 == target_audience_2: score += 0.2 target_age_1 = d1["target_age"] target_age_2 = d2["target_age"] if target_age_1 == target_age_2: score += 0.2 address_1 = d1["address"] address_2 = d2["address"] if address_1 == address_2: score += 0.2 gender_1 = d1["theme"] gender_2 = d2["theme"] if gender_1 == gender_2: score += 0.5 return score if data_1 and data_2: try: score_1 = calculate_v1(data_1, data_2) return score_1 # score_2 = calculate_v2(data_1, data_2) # return score_1, score_2 except Exception as e: return 0 else: return 0 def title_mix(title_p, dt, trace_id): """ 执行代码 :param trace_id: 请求唯一 id :param title_p: :param dt: dt """ five_days_ago = five_days_before(ori_dt=dt) days_list = generate_daily_strings(five_days_ago, dt) L = [] for day_str in days_list: json_path = os.path.join(os.getcwd(), 'applications', 'static', day_str) # 处理标题信息 files = os.listdir(json_path) for file in files: if file.endswith(".json"): L.append(os.path.join(json_path, file)) print("召回的视频量", len(L)) score_list_1 = [] score_list_2 = [] for file in L: file_name = file.split('/')[-1].replace(".json", "") v_id = file_name.split('_')[1] uid = file_name.split('_')[0] # score1, score2 = compute_similarity(title_p, file) score1 = compute_similarity(file, file) score_list_1.append([score1, v_id, uid]) # score_list_2.append([score2, v_id, uid]) s1_list = sorted(score_list_1, key=lambda x: x[0], reverse=True) # s2_list = sorted(score_list_2, key=lambda x: x[0], reverse=True) title = title_p.split("/")[-1].replace(".json", "") obj = { "title": title, "s1_vid": s1_list[0][1], "s1_score": s1_list[0][0], "s1_uid": s1_list[0][2], # "s2_vid": s2_list[0][1], # "s2_score": s2_list[0][0], # "s2_uid": s2_list[0][2] } logging( code="1003", info="计算结果得分", data=obj, function="title_mix", trace_id=trace_id ) return obj