import jieba import numpy as np import datetime import pandas as pd from odps import ODPS from my_utils import filter_video_status from db_helper import RedisHelper from my_config import set_config from log import Log config_, _ = set_config() log_ = Log() def get_data_from_odps(project, sql): odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: with odps.execute_sql(sql=sql).open_reader() as reader: data_df = reader.to_pandas() except Exception as e: data_df = None return data_df def get_word_vector(s1, s2): """ :param s1: 句子1 :param s2: 句子2 :return: 返回句子的余弦相似度 """ # 分词 cut1 = jieba.lcut(s1, cut_all=False) cut2 = jieba.lcut(s2, cut_all=False) list_word1 = (','.join(cut1)).split(',') list_word2 = (','.join(cut2)).split(',') # 列出所有的词,取并集 key_word = list(set(list_word1 + list_word2)) # print(key_word) # 给定形状和类型的用0填充的矩阵存储向量 word_vector1 = np.zeros(len(key_word)) word_vector2 = np.zeros(len(key_word)) # 计算词频 # 依次确定向量的每个位置的值 for i in range(len(key_word)): # 遍历key_word中每个词在句子中的出现次数 for j in range(len(list_word1)): if key_word[i] == list_word1[j]: word_vector1[i] += 1 for k in range(len(list_word2)): if key_word[i] == list_word2[k]: word_vector2[i] += 1 # 输出向量 # print(word_vector1) # print(word_vector2) return word_vector1, word_vector2 def cos_dist(vec1, vec2): """ :param vec1: 向量1 :param vec2: 向量2 :return: 返回两个向量的余弦相似度 """ dist1 = float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))) return dist1 def get_movie_video_top_list(): sql = "select videoid, title from videoods.movie_store_video_top_list where returns > 5;" data_df = get_data_from_odps(project='videoods', sql=sql) data_df = data_df.fillna(0) data_df['videoid'] = data_df['videoid'].astype(int) movie_videos = dict() for index, row in data_df.iterrows(): if row['videoid'] == 0: continue # if index > 20: # break movie_videos[int(row['videoid'])] = row['title'] return movie_videos def get_sim_videos(): now = datetime.datetime.now() log_.info(f"now = {datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')}") sql_create_time = datetime.datetime.strftime(now - datetime.timedelta(days=30), '%Y-%m-%d %H:%M:%S') if sql_create_time < '2022-04-22 16:40:00': sql_create_time = '2022-04-22 16:40:00' sql = f"SELECT video_id, create_time, title FROM videoods.movie_store_video_allow_list_final " \ f"WHERE create_time>='{sql_create_time}';" data_df = get_data_from_odps(project='videoods', sql=sql) video_ids = [int(video_id) for video_id in data_df['video_id'].to_list()] # 对视频状态进行过滤 filtered_videos = filter_video_status(list(video_ids)) sim_videos = dict() for index, row in data_df.iterrows(): video_id = int(row['video_id']) if video_id in filtered_videos: sim_videos[video_id] = row['title'] return sim_videos def similarity_rank(movie_videos, sim_videos): redis_helper = RedisHelper() sim_result = [] if len(movie_videos) == 0 or len(sim_videos) == 0: return for video_id, title in movie_videos.items(): item_sim_list = [] for vid, title1 in sim_videos.items(): if vid == video_id: continue vec1, vec2 = get_word_vector(title, title1) dist = cos_dist(vec1, vec2) if dist > 0: # item_sim[vid] = dist item_sim = {'top_video_id': video_id, 'title': title, 'vid': vid, 'title1': title1, 'dist': dist} item_sim_list.append(item_sim) item_sim_list.sort(key=lambda x: x['dist'], reverse=True) sim_result.extend(item_sim_list[:config_.SIM_N_19]) # to_redis key_name = f"{config_.MOVIE_RELEVANT_LIST_KEY_NAME_PREFIX}{video_id}" relevant_data = dict() for item in item_sim_list[:config_.SIM_N_19]: relevant_data[item['vid']] = item['dist'] if redis_helper.key_exists(key_name=key_name): redis_helper.del_keys(key_name=key_name) if relevant_data: print(video_id) redis_helper.add_data_with_zset(key_name=key_name, data=relevant_data, expire_time=10*60) dist_df = pd.DataFrame(sim_result, columns=['top_video_id', 'title', 'vid', 'title1', 'dist']) dist_df.to_csv('./data/videos_dist.csv', index=False) if __name__ == '__main__': movie_videos = get_movie_video_top_list() sim_videos = get_sim_videos() log_.info(f"movie_videos count = {len(movie_videos)}, sim_videos count = {len(sim_videos)}") similarity_rank(movie_videos=movie_videos, sim_videos=sim_videos)