liqian 2 سال پیش
والد
کامیت
4b951d57d4
3فایلهای تغییر یافته به همراه125 افزوده شده و 7 حذف شده
  1. 6 6
      config.py
  2. 108 1
      religion_class_user_update.py
  3. 11 0
      religion_class_user_update_task.sh

+ 6 - 6
config.py

@@ -399,16 +399,16 @@ class BaseConfig(object):
     RELIGION_USERS = {
         'catholicism': {
             'project': 'loghubods',
-            'table': 'catholicism_videolist',
-            # 视频列表更新结果存放 redis key 前缀,完整格式:'religion:catholicism:videos:item:{date}'
-            'key_name_prefix': 'religion:catholicism:videos:item:',
+            'table': '',
+            # 用户列表更新结果存放 redis key 前缀,完整格式:'religion:catholicism:users:item:{hash_tag}:{date}'
+            'key_name_prefix': 'religion:catholicism:users:item:',
         },  # 天主教
 
         'christianity': {
             'project': 'loghubods',
-            'table': 'christianity_videolist',
-            # 视频列表更新结果存放 redis key 前缀,完整格式:'religion:christianity:videos:item:{date}'
-            'key_name_prefix': 'religion:christianity:videos:item:',
+            'table': '',
+            # 用户列表更新结果存放 redis key 前缀,完整格式:'religion:christianity:users:item:{hash_tag}:{date}'
+            'key_name_prefix': 'religion:christianity:users:item:',
         },  # 基督教
     }
 

+ 108 - 1
religion_class_user_update.py

@@ -1 +1,108 @@
-from sklearn.utils import murmurhash
+import hashlib
+import datetime
+import sys
+import traceback
+import pandas as pd
+from odps import ODPS
+from threading import Timer
+from get_data import get_data_from_odps
+from utils import send_msg_to_feishu
+from db_helper import RedisHelper
+from config import set_config
+from log import Log
+
+config_, env = set_config()
+log_ = Log()
+features = ['mid']
+
+
+def data_check(project, table, now_date):
+    """检查数据是否准备好"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+
+    try:
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+        sql = f'select * from {project}.{table} where dt = {dt}'
+        with odps.execute_sql(sql=sql).open_reader() as reader:
+            data_count = reader.count
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
+def get_religion_users(now_date, project, table, key_name_prefix):
+    """获取宗教用户列表"""
+    # 获取mid
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
+    records = get_data_from_odps(date=dt, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    mid_list = feature_df['mid'].tolist()
+    # 对mid哈希,写入对应的key中
+    hash_result = {}
+    hash_tag_list = []
+    for mid in mid_list:
+        hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest()
+        hash_tag = hash_mid[-1:]
+        if hash_tag in hash_tag_list:
+            hash_result[hash_tag].append(mid)
+        else:
+            hash_result[hash_tag] = [mid]
+            hash_tag_list.append(hash_tag)
+
+    # 写入对应的redis
+    redis_helper = RedisHelper()
+    for key, val in hash_result:
+        if len(val) > 0:
+            key_name = f"{key_name_prefix}{key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}"
+            redis_helper.add_data_with_set(key_name=key_name, values=val, expire_time=2 * 3600)
+
+
+def timer_check(religion_name):
+    project = config_.RELIGION_USERS[religion_name]['project']
+    table = config_.RELIGION_USERS[religion_name]['table']
+    key_name_prefix = config_.RELIGION_USERS[religion_name]['key_name_prefix']
+    now_date = datetime.datetime.today()
+    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
+    # 查看当天更新的数据是否已准备好
+    data_count = data_check(project=project, table=table, now_date=now_date)
+    if data_count > 0:
+        log_.info(f'religion_name = {religion_name}, religion_users_count = {data_count}')
+        # 数据准备好,进行更新
+        get_religion_users(now_date=now_date, project=project, table=table, key_name_prefix=key_name_prefix)
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(5 * 60, timer_check, args=[religion_name]).start()
+
+
+def main():
+    try:
+        religion_name = sys.argv[1]
+        timer_check(religion_name)
+    except Exception as e:
+        log_.error(f"宗教用户数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教用户数据更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == '__main__':
+    main()

+ 11 - 0
religion_class_user_update_task.sh

@@ -0,0 +1,11 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline &&
+    nohup /root/anaconda3/bin/python /data2/rov-offline/religion_class_user_update.py 'catholicism' &
+    nohup /root/anaconda3/bin/python /data2/rov-offline/religion_class_user_update.py 'christianity' &
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline &&
+    nohup /root/anaconda3/bin/python /data/rov-offline/religion_class_user_update.py 'catholicism' &
+    nohup /root/anaconda3/bin/python /data/rov-offline/religion_class_user_update.py 'christianity' &
+fi