""" @author: luojunhui """ from datetime import datetime, timedelta from applications.match_algorithm.title_similarity import jcd_title_similarity def jac_score(d1, d2): """ 通过交并集来判断 :param d1: :param d2: :return: """ f1_keys = set(d1["key_words"]) f2_keys = set(d2["key_words"]) keys_union = f1_keys | f2_keys keys_intersection = f1_keys & f2_keys f1_search_keys = set(d1["search_keys"]) f2_search_keys = set(d2["search_keys"]) search_keys_union = f1_search_keys | f2_search_keys search_keys_intersection = f1_search_keys & f2_search_keys f1_extra_keys = set(d1["extra_keys"]) f2_extra_keys = set(d2["extra_keys"]) extra_keys_union = f1_extra_keys | f2_extra_keys extra_keys_intersection = f1_extra_keys & f2_extra_keys score_1 = len(keys_intersection) / len(keys_union) score_2 = len(search_keys_intersection) / len(search_keys_union) score_3 = len(extra_keys_intersection) / len(extra_keys_union) return score_1 * 0.4 + score_2 * 0.4 + score_3 * 0.2, d2['video_id'] def title_similarity_rank(content_title, recall_list): """ :param content_title: :param recall_list: :return: """ include_title_list = [] for item in recall_list: video_info = item['result'] platform = item['platform'] if platform in ['dy_search', 'baidu_search']: title = video_info['title'] elif platform in ['xg_search']: title = video_info['video_title'] else: continue item['title'] = title item['score'] = jcd_title_similarity(content_title, title) include_title_list.append(item) sorted_list = sorted(include_title_list, key=lambda x: x['score'], reverse=True) return sorted_list async def get_content_oss_fission_dict(db_client, config, content_id) -> dict[str: float]: """ 通过 content_id 对应的 oss 路径对应的裂变表现进行排序 oss 数据每天凌晨 2 点更新 :return: """ FISSION_DETAIL_TABLE = config.fission_detail_table two_days_ago_dt = (datetime.now() - timedelta(days=2)).strftime('%Y%m%d') sql = f""" SELECT oss_name, fission_rate_0, fission_0_on_read FROM {FISSION_DETAIL_TABLE} WHERE content_id = '{content_id}' and dt >= '{two_days_ago_dt}' ORDER BY dt DESC; """ result = await db_client.async_select(sql) fission_info_dict = {} if result: for item in result: key = item[0] value = { "fission_rate_0": item[1], "fission_0_on_read": item[2] } if fission_info_dict.get(key): continue else: fission_info_dict[key] = value return fission_info_dict else: return {} async def get_title_oss_fission_list(db_client, config, content_id) -> list[dict]: """ 通过 content_id 对应的 oss 路径对应的裂变表现进行排序 oss 数据每天凌晨 2 点更新 todo: 获取有数据的最新dt :return: """ FISSION_DETAIL_TABLE = config.fission_detail_table LONG_ARTICLES_TEXT_TABLE = config.article_text_table LONG_ARTICLES_CRAWLER_TABLE = config.article_crawler_video_table # two_days_ago_dt = (datetime.now() - timedelta(days=2)).strftime('%Y%m%d') sql = f""" SELECT lavfi.oss_name, lavfi.fission_0_on_read, lacv.platform, lacv.cover_oss_path, lacv.user_id FROM {FISSION_DETAIL_TABLE} lavfi JOIN {LONG_ARTICLES_CRAWLER_TABLE} lacv on lavfi.oss_name = lacv.video_oss_path WHERE title = ( SELECT article_title FROM {LONG_ARTICLES_TEXT_TABLE} WHERE content_id = '{content_id}' ); AND lavfi.dt = ( SELECT MAX(dt) FROM long_articles_videos_fission_info WHERE oss_name = lavfi.oss_name ) """ result = await db_client.async_select(sql) fission_info_list = [] if result: for item in result: obj = { "platform": item[2], "video_oss_path": item[0], "cover_oss_path": item[3], "uid": item[4], "fission_0_on_read": item[1], } fission_info_list.append(obj) return sorted(fission_info_list, key=lambda x: x['fission_0_on_read'], reverse=True) else: return []