Browse Source

Merge branch 'laohaokan-20221009' into test

liqian 2 years ago
parent
commit
03da9aba97
5 changed files with 249 additions and 2 deletions
  1. 9 0
      config.py
  2. 143 0
      laohaokan_recommend_update.py
  3. 4 2
      region_rule_rank_h_task.sh
  4. 86 0
      religion_videos_update.py
  5. 7 0
      religion_videos_update_task.sh

+ 9 - 0
config.py

@@ -341,6 +341,9 @@ class BaseConfig(object):
         ],
     }
 
+    # 老好看宗教实验数据
+    LHK_RULE_PARAMS = {'data': 'lhk_data', 'rule': 'lhk_rule'}
+
     # 不区分地域数据使用相对48h数据
     RULE_PARAMS_REGION_APP_TYPE_48H = {
         'rule_params': {
@@ -572,6 +575,12 @@ class BaseConfig(object):
         CITY_CODE['成都']: [SPECIAL_AREA_LIMIT_KEY_NAME, ],
     }
 
+    # 宗教视频更新使用数据
+    RELIGION_VIDEOS_PROJECT = 'loghubods'
+    RELIGION_VIDEOS_TABLE = 'religion_video'
+    # 宗教视频列表更新结果存放 redis key 前缀,完整格式:'religion:videos:item:{date}'
+    KEY_NAME_PREFIX_RELIGION_VIDEOS = 'religion:videos:item:'
+
 
 class DevelopmentConfig(BaseConfig):
     """开发环境配置"""

+ 143 - 0
laohaokan_recommend_update.py

@@ -0,0 +1,143 @@
+import datetime
+import gevent
+from db_helper import RedisHelper
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+
+config_, env = set_config()
+log_ = Log()
+initial_param = {'data': 'data1', 'rule': 'rule4'}
+new_param = config_.LHK_RULE_PARAMS
+redis_helper = RedisHelper()
+
+
+def get_religion_videos(now_date):
+    """获取宗教视频列表"""
+    religion_key_name = f"{config_.KEY_NAME_PREFIX_RELIGION_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
+    if not redis_helper.key_exists(religion_key_name):
+        redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d')
+        religion_key_name = f"{config_.KEY_NAME_PREFIX_RELIGION_VIDEOS}{redis_dt}"
+    religion_videos = redis_helper.get_all_data_from_zset(key_name=religion_key_name, desc=True, with_scores=True)
+    if religion_videos is None:
+        return []
+    return religion_videos
+
+
+def merge_process(initial_key_name, new_key_name, now_videos, religion_video_id_list):
+    initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
+    if initial_data is None or len(initial_data) == 0:
+        return now_videos, religion_video_id_list
+
+    initial_video_ids = [int(video_id) for video_id, _ in initial_data]
+    initial_video_ids = [video_id for video_id in initial_video_ids if video_id not in now_videos]
+    religion_video_id_list = [video_id for video_id in religion_video_id_list if video_id not in initial_video_ids]
+    if len(religion_video_id_list) == 0:
+        new_video_ids = initial_video_ids
+    else:
+        new_video_ids = []
+        for i, video_id in enumerate(initial_video_ids):
+            new_video_ids.append(video_id)
+            now_videos.append(video_id)
+            if i % 2 == 1 and len(religion_video_id_list) > 0:
+                new_video_ids.append(religion_video_id_list[0])
+                now_videos.append(religion_video_id_list[0])
+                religion_video_id_list = religion_video_id_list[1:]
+
+    # 按照排序给定分数
+    new_result = {}
+    step = 100 / (len(new_video_ids) * 2)
+    for i, video_id in enumerate(new_video_ids):
+        score = 100 - i * step
+        new_result[int(video_id)] = score
+    # 写入新的key中
+    redis_helper.add_data_with_zset(key_name=new_key_name, data=new_result, expire_time=23 * 3600)
+    return now_videos, religion_video_id_list
+
+
+def merge_with_region(now_date, now_h, region, religion_video_id_list):
+    initial_data_key = initial_param.get('data')
+    initial_rule_key = initial_param.get('rule')
+    new_data_key = new_param.get('data')
+    new_rule_key = new_param.get('rule')
+    now_videos = []
+    # 地域小时级数据合并
+    region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{initial_data_key}:{initial_rule_key}:" \
+                        f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    new_region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{new_data_key}:{new_rule_key}:" \
+                            f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    now_videos, religion_video_id_list = merge_process(initial_key_name=region_h_key_name,
+                                                       new_key_name=new_region_h_key_name,
+                                                       now_videos=now_videos,
+                                                       religion_video_id_list=religion_video_id_list)
+    # 地域24h数据合并
+    region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{initial_data_key}:" \
+                          f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    new_region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{new_data_key}:" \
+                              f"{new_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    now_videos, religion_video_id_list = merge_process(initial_key_name=region_24h_key_name,
+                                                       new_key_name=new_region_24h_key_name,
+                                                       now_videos=now_videos,
+                                                       religion_video_id_list=religion_video_id_list)
+
+    # 24h筛选数据合并
+    dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{initial_data_key}:" \
+                        f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    new_dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{new_data_key}:" \
+                            f"{new_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    now_videos, religion_video_id_list = merge_process(initial_key_name=dup2_24h_key_name,
+                                                       new_key_name=new_dup2_24h_key_name,
+                                                       now_videos=now_videos,
+                                                       religion_video_id_list=religion_video_id_list)
+
+    # 24h筛选后剩余数据合并
+    dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{initial_data_key}:" \
+                        f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    new_dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{new_data_key}:" \
+                            f"{new_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    now_videos, religion_video_id_list = merge_process(initial_key_name=dup3_24h_key_name,
+                                                       new_key_name=new_dup3_24h_key_name,
+                                                       now_videos=now_videos,
+                                                       religion_video_id_list=religion_video_id_list)
+    log_.info(f"region = {region} update end!")
+
+
+def merge_videos(now_date, now_h):
+    """将宗教视频插入到默认视频列表中"""
+    # 获取宗教视频列表
+    religion_videos = get_religion_videos(now_date=now_date)
+    religion_video_id_list = [int(video_id) for video_id, _ in religion_videos]
+    # 列表合并
+    region_code_list = [code for region, code in config_.REGION_CODE.items()]
+    task_list = [
+        gevent.spawn(merge_with_region, now_date, now_h, region, religion_video_id_list)
+        for region in region_code_list
+    ]
+    gevent.joinall(task_list)
+    # 特殊城市视频数据准备
+    for region, city_list in config_.REGION_CITY_MAPPING.items():
+        t = [
+            gevent.spawn(
+                merge_with_region,
+                now_date, now_h, city_code, religion_video_id_list
+            )
+            for city_code in city_list
+        ]
+        gevent.joinall(t)
+
+
+if __name__ == '__main__':
+    log_.info(f"laohaokan recommend data update start...")
+    now_date = datetime.datetime.today()
+    now_h = datetime.datetime.now().hour
+    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
+    merge_videos(now_date, now_h)
+    log_.info(f"laohaokan recommend data update end!")
+    send_msg_to_feishu(
+        webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+        key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+        msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新完成\n"
+                 f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n"
+                 f"now_h: {now_h}\n"
+                 f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}"
+    )

+ 4 - 2
region_rule_rank_h_task.sh

@@ -3,9 +3,11 @@ echo $ROV_OFFLINE_ENV
 if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
     cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rule_rank_h_by_24h.py &&
      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h_by24h.py &&
-      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py '24h'
+      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py '24h' &&
+      /root/anaconda3/bin/python /data2/rov-offline/laohaokan_recommend_update.py
 elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
     cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rule_rank_h_by_24h.py &&
      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h_by24h.py &&
-      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py '24h'
+      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py '24h' &&
+      /root/anaconda3/bin/python /data/rov-offline/laohaokan_recommend_update.py
 fi

+ 86 - 0
religion_videos_update.py

@@ -0,0 +1,86 @@
+import datetime
+import pandas as pd
+from odps import ODPS
+from threading import Timer
+from get_data import get_data_from_odps
+from db_helper import RedisHelper
+from config import set_config
+from log import Log
+
+config_, env = set_config()
+log_ = Log()
+features = ['videoid', 'play_count_total', 'gmt_create']
+
+
+def data_check(project, table, now_date):
+    """检查数据是否准备好"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+
+    try:
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        sql = f'select * from {project}.{table} where dt = {dt}'
+        with odps.execute_sql(sql=sql).open_reader() as reader:
+            data_count = reader.count
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
+def get_religion_videos(now_date, project, table):
+    """获取宗教视频列表"""
+    # 获取videoId
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+    records = get_data_from_odps(date=dt, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    # 按照发布时间和播放量进行倒序
+    feature_df = feature_df.sort_values(by=['gmt_create', 'play_count_total'], ascending=False)
+    print(feature_df)
+    video_id_list = feature_df['videoid'].to_list()
+    # 按照排序给定分数
+    final_result = {}
+    step = 100 / (len(video_id_list) * 2)
+    for i, video_id in enumerate(video_id_list):
+        score = 100 - i * step
+        final_result[int(video_id)] = score
+
+    # 写入对应的redis
+    key_name = \
+        f"{config_.KEY_NAME_PREFIX_RELIGION_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
+    if len(final_result) > 0:
+        redis_helper = RedisHelper()
+        redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
+
+
+def h_timer_check():
+    project = config_.RELIGION_VIDEOS_PROJECT
+    table = config_.RELIGION_VIDEOS_TABLE
+    now_date = datetime.datetime.today()
+    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
+    # 查看当天更新的数据是否已准备好
+    data_count = data_check(project=project, table=table, now_date=now_date)
+    if data_count > 0:
+        log_.info(f'religion_videos_count = {data_count}')
+        # 数据准备好,进行更新
+        get_religion_videos(now_date=now_date, project=project, table=table)
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(5 * 60, h_timer_check).start()
+
+
+if __name__ == '__main__':
+    h_timer_check()

+ 7 - 0
religion_videos_update_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/religion_videos_update.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/religion_videos_update.py
+fi