123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import jieba
- import numpy as np
- import datetime
- import pandas as pd
- from odps import ODPS
- from my_utils import filter_video_status
- from db_helper import RedisHelper
- from my_config import set_config
- from log import Log
- config_, _ = set_config()
- log_ = Log()
- def get_data_from_odps(project, sql):
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_df = reader.to_pandas()
- except Exception as e:
- data_df = None
- return data_df
- def get_word_vector(s1, s2):
- """
- :param s1: 句子1
- :param s2: 句子2
- :return: 返回句子的余弦相似度
- """
- # 分词
- cut1 = jieba.lcut(s1, cut_all=False)
- cut2 = jieba.lcut(s2, cut_all=False)
- list_word1 = (','.join(cut1)).split(',')
- list_word2 = (','.join(cut2)).split(',')
- # 列出所有的词,取并集
- key_word = list(set(list_word1 + list_word2))
- # print(key_word)
- # 给定形状和类型的用0填充的矩阵存储向量
- word_vector1 = np.zeros(len(key_word))
- word_vector2 = np.zeros(len(key_word))
- # 计算词频
- # 依次确定向量的每个位置的值
- for i in range(len(key_word)):
- # 遍历key_word中每个词在句子中的出现次数
- for j in range(len(list_word1)):
- if key_word[i] == list_word1[j]:
- word_vector1[i] += 1
- for k in range(len(list_word2)):
- if key_word[i] == list_word2[k]:
- word_vector2[i] += 1
- # 输出向量
- # print(word_vector1)
- # print(word_vector2)
- return word_vector1, word_vector2
- def cos_dist(vec1, vec2):
- """
- :param vec1: 向量1
- :param vec2: 向量2
- :return: 返回两个向量的余弦相似度
- """
- dist1 = float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))
- return dist1
- def get_movie_video_top_list():
- sql = "select videoid, title from videoods.movie_store_video_top_list where returns > 5;"
- data_df = get_data_from_odps(project='videoods', sql=sql)
- data_df = data_df.fillna(0)
- data_df['videoid'] = data_df['videoid'].astype(int)
- movie_videos = dict()
- for index, row in data_df.iterrows():
- if row['videoid'] == 0:
- continue
- # if index > 20:
- # break
- movie_videos[int(row['videoid'])] = row['title']
- return movie_videos
- def get_sim_videos():
- now = datetime.datetime.now()
- log_.info(f"now = {datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')}")
- sql_create_time = datetime.datetime.strftime(now - datetime.timedelta(days=30), '%Y-%m-%d %H:%M:%S')
- if sql_create_time < '2022-04-22 16:40:00':
- sql_create_time = '2022-04-22 16:40:00'
- sql = f"SELECT video_id, create_time, title FROM videoods.movie_store_video_allow_list_final " \
- f"WHERE create_time>='{sql_create_time}';"
- data_df = get_data_from_odps(project='videoods', sql=sql)
- video_ids = [int(video_id) for video_id in data_df['video_id'].to_list()]
- # 对视频状态进行过滤
- filtered_videos = filter_video_status(list(video_ids))
- sim_videos = dict()
- for index, row in data_df.iterrows():
- video_id = int(row['video_id'])
- if video_id in filtered_videos:
- sim_videos[video_id] = row['title']
- return sim_videos
- def similarity_rank(movie_videos, sim_videos):
- redis_helper = RedisHelper()
- sim_result = []
- if len(movie_videos) == 0 or len(sim_videos) == 0:
- return
- for video_id, title in movie_videos.items():
- item_sim_list = []
- for vid, title1 in sim_videos.items():
- if vid == video_id:
- continue
- vec1, vec2 = get_word_vector(title, title1)
- dist = cos_dist(vec1, vec2)
- if dist > 0:
- # item_sim[vid] = dist
- item_sim = {'top_video_id': video_id, 'title': title, 'vid': vid, 'title1': title1, 'dist': dist}
- item_sim_list.append(item_sim)
- item_sim_list.sort(key=lambda x: x['dist'], reverse=True)
- sim_result.extend(item_sim_list[:config_.SIM_N_19])
- # to_redis
- key_name = f"{config_.MOVIE_RELEVANT_LIST_KEY_NAME_PREFIX}{video_id}"
- relevant_data = dict()
- for item in item_sim_list[:config_.SIM_N_19]:
- relevant_data[item['vid']] = item['dist']
- if redis_helper.key_exists(key_name=key_name):
- redis_helper.del_keys(key_name=key_name)
- if relevant_data:
- print(video_id)
- redis_helper.add_data_with_zset(key_name=key_name, data=relevant_data, expire_time=10*60)
- dist_df = pd.DataFrame(sim_result, columns=['top_video_id', 'title', 'vid', 'title1', 'dist'])
- dist_df.to_csv('./data/videos_dist.csv', index=False)
- if __name__ == '__main__':
- movie_videos = get_movie_video_top_list()
- sim_videos = get_sim_videos()
- log_.info(f"movie_videos count = {len(movie_videos)}, sim_videos count = {len(sim_videos)}")
- similarity_rank(movie_videos=movie_videos, sim_videos=sim_videos)
|