liqian 1 рік тому
батько
коміт
d31113056f
7 змінених файлів з 662 додано та 0 видалено
  1. 2 0
      .gitignore
  2. 78 0
      ad_thompson_param_update.py
  3. 8 0
      ad_thompson_param_update_task.sh
  4. 71 0
      config.py
  5. 432 0
      db_helper.py
  6. 57 0
      log.py
  7. 14 0
      utils.py

+ 2 - 0
.gitignore

@@ -58,3 +58,5 @@ docs/_build/
 # PyBuilder
 target/
 
+.idea/
+logs/

+ 78 - 0
ad_thompson_param_update.py

@@ -0,0 +1,78 @@
+import traceback
+import datetime
+import json
+import time
+from odps import ODPS
+from db_helper import RedisHelper
+from utils import send_msg_to_feishu
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+
+def main(project, table):
+    try:
+        # 获取广告ad_idea_id对应的曝光次数和点击次数
+        odps = ODPS(
+            access_id=config_.ODPS_CONFIG['ACCESSID'],
+            secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+            project=project,
+            endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        )
+        records = odps.read_table(name=table)
+        ad_idea_data = []
+        for item in records:
+            ad_idea_id = item['ad_idea_id']
+            view_pv = item['view_pv']
+            click_pv = item['click_pv']
+            if ad_idea_id is None or ad_idea_id == '':
+                continue
+            try:
+                view_pv = int(view_pv)
+            except:
+                view_pv = 0
+            try:
+                click_pv = int(click_pv)
+            except:
+                click_pv = 0
+            if view_pv < click_pv:
+                continue
+            param_alpha = click_pv
+            param_beta = view_pv - click_pv
+            ad_idea_data.append({'ad_idea_id': ad_idea_id, 'param': [param_alpha, param_beta]})
+        log_.info(f"ad_idea_data count: {len(ad_idea_data)}")
+        log_.info(f"ad_idea_data: {ad_idea_data}")
+        # 更新redis
+        redis_helper = RedisHelper()
+        i = 0
+        for ad_idea in ad_idea_data:
+            try:
+                key_name = f"{config_.THOMPSON_PARAM_KEY_PREFIX}{ad_idea['ad_idea_id']}"
+                value = json.dumps(ad_idea['param'])
+                redis_helper.set_data_to_redis(key_name=key_name, value=value, expire_time=24 * 3600)
+                i += 1
+            except:
+                continue
+        log_.info(f"to redis count: {i}")
+    except Exception as e:
+        log_.error(f"地域分组小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 广告Thompson参数更新失败\n"
+                     f"exception: {e}\n"
+                     f"traceback: {traceback.format_exc()}"
+        )
+
+
+if __name__ == '__main__':
+    project = ''
+    table = ''
+    now_date = datetime.datetime.today()
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d %H:%M:%S')
+    log_.info(f"dt: {dt}")
+    start_time = time.time()
+    main(project=project, table=table)
+    log_.info(f"excuteTime: {(time.time() - start_time) * 1000}ms")

+ 8 - 0
ad_thompson_param_update_task.sh

@@ -0,0 +1,8 @@
+source /etc/profile
+echo $ROV_SERVER_OFFLINE_ENV
+if [[ $ROV_SERVER_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/ad_thompson_param_update.py
+
+elif [[ $ROV_SERVER_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/ad_thompson_param_update.py
+fi

+ 71 - 0
config.py

@@ -0,0 +1,71 @@
+import os
+
+
+class BaseConfig(object):
+    # ODPS服务配置
+    ODPS_CONFIG = {
+        'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+    }
+    # adIdeaId对应Thompson参数结果存放 redis key 前缀,完整格式:thompson:param:{ad_idea_id}
+    THOMPSON_PARAM_KEY_PREFIX = 'thompson:param:'
+    FEISHU_ROBOT = {
+        'server_robot': {
+            'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2',
+            # 自定义关键词key_word
+            'key_word': '服务报警'
+        },
+    }
+
+
+class TestConfig(BaseConfig):
+    """测试环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "测试环境"
+    # 测试环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+
+class PreProductionConfig(BaseConfig):
+    """预发布环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "预发布环境"
+    # 预发布环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1yup71yo02ki3yb5.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+
+class ProductionConfig(BaseConfig):
+    """生产环境配置"""
+    # 报警内容 环境区分
+    ENV_TEXT = "生产环境"
+    # 生产环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1yup71yo02ki3yb5.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+
+def set_config():
+    # 获取环境变量 ROV_SERVER_ENV
+    env = os.environ.get('ROV_SERVER_OFFLINE_ENV')
+    # env = 'test'
+    if env is None:
+        return
+    if env == 'test':
+        return TestConfig()
+    elif env == 'pre':
+        return PreProductionConfig()
+    elif env == 'pro':
+        return ProductionConfig()
+    else:
+        return

+ 432 - 0
db_helper.py

@@ -0,0 +1,432 @@
+import traceback
+import time
+import redis
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+conn_redis = None
+
+
+class RedisHelper(object):
+    def __init__(self, params=None, redis_info=config_.REDIS_INFO):
+        """
+        初始化redis连接信息
+        redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
+        """
+        self.redis_info = redis_info
+        self.host = redis_info['host']
+        self.port = redis_info['port']
+        self.password = redis_info['password']
+        self.params = params
+
+    def connect(self):
+        """
+        连接redis
+        :return: conn
+        """
+        global conn_redis
+        if conn_redis is None:
+            pool = redis.ConnectionPool(host=self.host,
+                                        port=self.port,
+                                        password=self.password,
+                                        decode_responses=True)
+            conn = redis.Redis(connection_pool=pool)
+            conn_redis = conn
+        return conn_redis
+
+    def key_exists(self, key_name):
+        """
+        判断key是否存在
+        :param key_name: key
+        :return: 存在-True, 不存在-False
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        res = conn.exists(key_name)
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'get_data_from_redis',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return res
+
+    def del_keys(self, key_name):
+        """
+        删除key
+        :param key_name: key
+        :return: None
+        """
+        conn = self.connect()
+        conn.delete(key_name)
+
+    def get_data_from_redis(self, key_name):
+        """
+        读取redis中的数据
+        :param key_name: key
+        :return: data
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        if not conn.exists(key_name):
+            # key不存在
+            return None
+        data = conn.get(key_name)
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'get_data_from_redis',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return data
+
+    def set_data_to_redis(self, key_name, value, expire_time=24*3600):
+        """
+        新增数据
+        :param key_name: key
+        :param value: 元素的值 videoId
+        :param expire_time: 过期时间,单位:s,默认1天
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        conn.set(key_name, value, ex=int(expire_time))
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'set_data_to_redis',
+        #         'executeTime': (time.time() - start_time) * 1000})
+
+    def add_data_with_zset(self, key_name, data, expire_time=7*24*3600):
+        """
+        新增数据,有序set
+        :param key_name: key
+        :param data: 元素的值及对应分数 type-dict  {value: score}
+        :param expire_time: 过期时间,单位:s,默认7天,type-int
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        conn.zadd(key_name, data)
+        # 设置过期时间
+        conn.expire(key_name, int(expire_time))
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'add_data_with_zset',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+
+    def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
+        """
+        根据索引位置获取元素的值
+        :param key_name: key
+        :param start: 索引起始点 闭区间,包含start
+        :param end: 索引结束点 闭区间,包含end
+        :param desc: 分数排序方式,默认从大到小
+        :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
+        :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        if not conn.exists(key_name):
+            return None
+        data = conn.zrange(key_name, start, end, desc, with_scores)
+        if with_scores:
+            data = data
+        else:
+            data = [eval(value) for value in data]
+
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'get_data_zset_with_index',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return data
+
+    def get_all_data_from_zset(self, key_name, desc=True, with_scores=False):
+        """
+        获取zset中所有元素的值
+        :param key_name: key
+        :param desc: 分数排序方式,默认从大到小
+        :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
+        :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            return None
+        data = []
+        start = 0
+        step = 100
+        while True:
+            end = start + step - 1
+            temp = conn.zrange(key_name, start, end, desc, with_scores)
+            if not temp:
+                break
+            data.extend(temp)
+            start += step
+        return data
+
+    def get_score_with_value(self, key_name, value):
+        """
+        在zset中,根据元素的value获取对应的score
+        :param key_name: key
+        :param value: 元素的值
+        :return: score value对应的score
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            return None
+        return conn.zscore(key_name, value)
+
+    def get_rank_with_value(self, key_name, value, desc=False):
+        """
+        在zset中,根据元素的value获取对应排名
+        :param key_name: key
+        :param value: 元素的值
+        :param desc: 是否倒序 type-bool 默认:False-按照score从小到大
+        :return: rank value对应的rank,从0开始,不存在返回None
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            return None
+        if desc is True:
+            return conn.zrevrank(key_name, value)
+        else:
+            return conn.zrank(key_name, value)
+
+    def update_score_with_value(self, key_name, value, score, expire_time=24*3600):
+        """
+        在zset中,修改元素value对应的score
+        :param key_name: key
+        :param value: 元素的值
+        :param score: value对应的score更新值
+        :param expire_time: 过期时间,单位:s,默认1天,type-int
+        """
+        conn = self.connect()
+        if conn.exists(key_name):
+            conn.zadd(key_name, {value: score})
+        else:
+            # key不存在时,需设置过期时间
+            conn.zadd(key_name, {value: score})
+            conn.expire(key_name, int(expire_time))
+
+    def remove_value_from_zset(self, key_name, value):
+        """
+        删除zset中的指定元素
+        :param key_name: key
+        :param value: 元素的值
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        res = conn.zrem(key_name, value)
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'remove_value_from_zset',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return res
+
+    def get_index_with_data(self, key_name, value):
+        """
+        根据元素的值获取在有序set中的位置,按照分数倒序(从大到小)
+        :param key_name: key
+        :param value: 元素的值
+        :return: idx 位置索引
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        res = conn.zrevrank(key_name, value)
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'get_index_with_data',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return res
+
+    def get_data_from_set(self, key_name):
+        """
+        获取set中的所有数据
+        :param key_name: key
+        :return: data
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        if not conn.exists(key_name):
+            # key不存在
+            return None
+        data = []
+        cursor = 0
+        while True:
+            cur, temp = conn.sscan(key_name, cursor=cursor, count=2000)
+            data.extend(temp)
+            if cur == 0:
+                break
+            cursor = cur
+
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'get_data_from_set',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return list(set(data))
+
+    def add_data_with_set(self, key_name, values, expire_time=30*60):
+        """
+        新增数据,set
+        :param key_name: key
+        :param values: 要添加的元素  类型-tuple
+        :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        conn.sadd(key_name, *values)
+        # 设置过期时间
+        conn.expire(key_name, int(expire_time))
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'add_data_with_set',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+
+    def data_exists_with_set(self, key_name, value):
+        """
+        判断元素value是否在集合key_name中
+        :param key_name: key
+        :param value: 需判断的元素
+        :return: 存在-True, 不存在-False
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        res = conn.sismember(key_name, value)
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'data_exists_with_set',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return res
+
+    def get_data_with_count_from_set(self, key_name, count=1):
+        """
+        从set中随机获取元素,并放回
+        :param key_name: key
+        :param count: 获取个数, 默认为1
+        :return:
+        """
+        conn = self.connect()
+        data = conn.srandmember(name=key_name, number=count)
+        return data
+
+    def remove_value_from_set(self, key_name, values):
+        """
+        删除set中的指定元素
+        :param key_name: key
+        :param values: 元素的值, 类型-tuple
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        res = conn.srem(key_name, *values)
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'remove_value_from_set',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+        return res
+
+    def decr_key(self, key_name, amount=1, expire_time=30*60):
+        """
+        redis自减
+        :param key_name: key
+        :param amount: 自减数,默认为1,type-int
+        :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        conn.decr(name=key_name, amount=amount)
+        conn.expire(key_name, int(expire_time))
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'decr_key',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+
+    def incr_key(self, key_name, amount=1, expire_time=30*60):
+        """
+        redis自增
+        :param key_name: key
+        :param amount: 自增数,默认为1,type-int
+        :param expire_time: 过期时间,单位:s,默认0.5小时 type-int
+        :return: None
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        conn.incr(name=key_name, amount=amount)
+        conn.expire(key_name, int(expire_time))
+        # if self.params is not None:
+        #     log_.info({
+        #         'logTimestamp': int(time.time() * 1000),
+        #         'request_id': self.params.request_id,
+        #         'operation': 'incr_key',
+        #         'executeTime': (time.time() - start_time) * 1000
+        #     })
+
+    def setnx_key(self, key_name, value, expire_time=5*60):
+        """
+        当key不存在时,将value塞入key中,key存在时不做操作
+        :param key_name: key
+        :param value: value
+        :return: 过期时间,单位:s,默认5分钟 type-int
+        """
+        # start_time = time.time()
+        conn = self.connect()
+        conn.setnx(name=key_name, value=value)
+        conn.expire(name=key_name, time=int(expire_time))
+
+    def get_batch_key(self, name_list):
+        conn = self.connect()
+        res = conn.mget(name_list)
+        return res
+
+
+
+if __name__ == '__main__':
+    redis_helper = RedisHelper()
+    key_name = f"previewed:videos:5:aan7"
+    res = redis_helper.get_data_with_count_from_set(
+        key_name=key_name,
+        count=20)
+    print(res)
+    res1 = redis_helper.remove_value_from_set(key_name=key_name, values=tuple({'2881413'}))
+    print(res1)
+    res = redis_helper.get_data_with_count_from_set(
+        key_name=key_name,
+        count=20)
+    print(res)
+

+ 57 - 0
log.py

@@ -0,0 +1,57 @@
+# coding:utf-8
+import os
+import logging
+import time
+import logging.config
+
+
+class Log(object):
+    def __init__(self, log_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "logs")):
+        if not os.path.exists(log_path):
+            os.makedirs(log_path)
+
+        # 文件的命名
+        self.logname = os.path.join(log_path, '{}.log'.format(time.strftime('%Y%m%d')))
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+        # 日志输出格式
+        self.formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+    def __console(self, level, message):
+        # 创建一个FileHandler,用于写到本地
+        fh = logging.FileHandler(self.logname, 'a', encoding='utf-8')
+        fh.setLevel(logging.DEBUG)
+        fh.setFormatter(self.formatter)
+        self.logger.addHandler(fh)
+
+        # 创建一个StreamHandler,用于输出到控制台
+        ch = logging.StreamHandler()
+        ch.setLevel(logging.DEBUG)
+        ch.setFormatter(self.formatter)
+        self.logger.addHandler(ch)
+
+        if level == 'info':
+            self.logger.info(message)
+        elif level == 'debug':
+            self.logger.debug(message)
+        elif level == 'warning':
+            self.logger.warning(message)
+        elif level == 'error':
+            self.logger.error(message)
+        # 这两行代码是为了避免日志输出重复问题
+        self.logger.removeHandler(ch)
+        self.logger.removeHandler(fh)
+        # 关闭打开的文件
+        fh.close()
+
+    def debug(self, message):
+        self.__console('debug', message)
+
+    def info(self, message):
+        self.__console('info', message)
+
+    def warning(self, message):
+        self.__console('warning', message)
+
+    def error(self, message):
+        self.__console('error', message)

+ 14 - 0
utils.py

@@ -0,0 +1,14 @@
+import requests
+
+
+def send_msg_to_feishu(webhook, key_word, msg_text):
+    """发送消息到飞书"""
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "text",
+        "content": {
+            "text": '{}: {}'.format(key_word, msg_text)
+        }
+    }
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    print(response.text)