瀏覽代碼

add whole movies update

liqian 2 年之前
父節點
當前提交
81af54673f
共有 2 個文件被更改,包括 177 次插入0 次删除
  1. 51 0
      utils.py
  2. 126 0
      whole_movies_update.py

+ 51 - 0
utils.py

@@ -186,6 +186,57 @@ def filter_video_status(video_ids):
     return filtered_videos
 
 
+def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
+    """
+    对视频状态进行过滤
+    :param video_ids: 视频id列表 type-list
+    :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
+    :return: filtered_videos
+    """
+    mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
+    video_status_sql = "SELECT t1.id AS 'video_id', " \
+                       "t1.transcode_status AS 'transcoding_status', " \
+                       "t2.audit_status AS 'audit_status', " \
+                       "t2.video_status AS 'open_status', " \
+                       "t2.recommend_status AS 'applet_rec_status', " \
+                       "t2.app_recommend_status AS 'app_rec_status', " \
+                       "t3.charge AS 'payment_status', " \
+                       "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
+                       "FROM longvideo.wx_video t1 " \
+                       "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
+                       "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
+                       "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
+    if len(video_ids) == 1:
+        sql = "SELECT video_id " \
+              "FROM ({}) " \
+              "WHERE audit_status = 5 " \
+              "AND applet_rec_status = {} " \
+              "AND open_status = 1 " \
+              "AND payment_status = 0 " \
+              "AND encryption_status != 5 " \
+              "AND transcoding_status = 3 " \
+              "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
+        data = mysql_helper.get_data(sql=sql)
+
+    else:
+        data = []
+        for i in range(len(video_ids) // 2000 + 1):
+            sql = "SELECT video_id " \
+                  "FROM ({}) " \
+                  "WHERE audit_status = 5 " \
+                  "AND applet_rec_status = {} " \
+                  "AND open_status = 1 " \
+                  "AND payment_status = 0 " \
+                  "AND encryption_status != 5 " \
+                  "AND transcoding_status = 3 " \
+                  "AND video_id IN {};".format(video_status_sql, applet_rec_status, tuple(video_ids[i*2000:(i+1)*2000]))
+            select_res = mysql_helper.get_data(sql=sql)
+            if select_res is not None:
+                data += select_res
+    filtered_videos = [int(temp[0]) for temp in data]
+    return filtered_videos
+
+
 def filter_video_status_app(video_ids):
     """
     对视频状态进行过滤 - app

+ 126 - 0
whole_movies_update.py

@@ -0,0 +1,126 @@
+import time
+import datetime
+import pandas as pd
+import math
+import random
+from odps import ODPS
+from threading import Timer
+from get_data import get_data_from_odps
+from db_helper import RedisHelper, MysqlHelper
+from config import set_config
+from log import Log
+from utils import filter_video_status_with_applet_rec
+
+config_, env = set_config()
+log_ = Log()
+
+features = [
+    '视频id',
+    '抓取时间',
+    '进入黑名单时间',
+    '站外播放量'
+]
+
+
+def h_data_check(project, table, now_date):
+    """检查数据是否准备好"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+
+    try:
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        sql = f'select * from {project}.{table} where dt = {dt}'
+        with odps.execute_sql(sql=sql).open_reader() as reader:
+            data_count = reader.count
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
+def get_feature_data(now_date, project, table):
+    """获取特征数据"""
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+    # dt = '2022041310'
+    records = get_data_from_odps(date=dt, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+def video_rank(app_type, df, now_date, now_h, return_count):
+    """
+    对视频进行排序
+    :param app_type:
+    :param df:
+    :param now_date:
+    :param now_h:
+    :param return_count: 小时级数据回流限制数
+    :return:
+    """
+    # 视频状态过滤
+    log_.info(f'initial_df count = {len(df)}')
+    video_ids = [int(video_id) for video_id in df['videoid']]
+    df['videoid'] = df['videoid'].astype(int)
+
+    # 获取待推荐
+    filtered_result_6 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=-6)
+    filtered_df_6 = df[df['videoid'].isin(filtered_result_6)]
+    filtered_df_6 = filtered_df_6.drop_duplicates(['videoid'], keep=False)
+    log_.info(f'filtered_df_6 count = {len(filtered_df_6)}')
+
+    # 获取普通推荐
+    filtered_result_1 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=1)
+    filtered_df_1 = df[df['videoid'].isin(filtered_result_1)]
+    filtered_df_1 = filtered_df_1.drop_duplicates(['videoid'], keep=False)
+    log_.info(f'filtered_df_1 count = {len(filtered_df_1)}')
+
+    log_.info(f'df length = {len(df)}')
+    # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
+    h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= 0.005)]
+    h_recall_videos = h_recall_df['videoid'].to_list()
+    log_.info(f'h_recall videos count = {len(h_recall_videos)}')
+    # 不符合进入召回源条件的视频
+    df = df.append(h_recall_df)
+    h_else_df = df.drop_duplicates(['videoid'], keep=False)
+    h_else_df = h_else_df.sort_values(by=['score'], ascending=False)
+    h_else_videos = h_else_df['videoid'].to_list()
+    # 合并,给定分数
+    final_videos = h_recall_videos + h_else_videos
+    final_result = {}
+    step = round(100/len(final_videos), 3)
+    for i, video_id in enumerate(final_videos):
+        score = 100 - i * step
+        final_result[int(video_id)] = score
+    # 写入对应的redis
+    key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
+    if len(final_result) > 0:
+        redis_helper = RedisHelper()
+        redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
+
+
+def rank_by_h(app_type, now_date, now_h, return_count_list, project, table):
+    # 获取特征数据
+    feature_df = get_feature_data(now_date=now_date, project=project, table=table)
+    # 计算score
+    score_df = cal_score(df=feature_df)
+    # rank
+    for cnt in return_count_list:
+        log_.info(f"return_count = {cnt}")
+        video_rank(app_type=app_type, df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
+    # to-csv
+    score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
+    score_df.to_csv(f'./data/{score_filename}')