|| """created on Thursday, February,29 2024@author: luojunhui"""import osimport sysimport jsonimport timeimport requestsimport datetimeimport schedulefrom odps import ODPSsys.path.append(os.getcwd())from application.common.log import AliyunLoggerdef bot(success_list):    """    机器人    """    url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49"    headers = {"Content-Type": "application/json"}    payload = {        "msg_type": "interactive",        "card": {            "elements": [                {                    "tag": "div",                    "text": {                        "content": "**成功送入流量池的视频一共{}条**\n{}".format(                            len(success_list), success_list                        ),                        "tag": "lark_md",                    },                },            ],            "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}},        },    }    requests.request("POST", url=url, headers=headers, data=json.dumps(payload))class OdpsFunction(object):    """    odps function class    """    def __init__(self):        self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"        self.access_id = "LTAIWYUujJAm7CbH"        self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"        self.project = "loghubods"        self.od = ODPS(            access_id=self.access_id,            secret_access_key=self.access_key,            endpoint=self.endpoint,            project=self.project,        )    def select(self, sql):        """        :param sql: 查询语句        :return: odps_obj{}        """        with self.od.execute_sql(sql).open_reader() as reader:            if reader:                return [item for item in reader]            else:                return []def generate_flow_pool_levels():    """    读取流量池 level_id表并且通过流量池层级来生成 id    :return:    """    url = "https://admin.piaoquantv.com/manager/flowpool/pageList"    payload = {        "pageNum": 1,        "pageSize": 10000    }    headers = {        'authority': 'admin.piaoquantv.com',        'accept': 'application/json',        'accept-language': 'en,zh;q=0.9',        'content-type': 'application/json',        'cookie': 'SESSION=ZDY1MTYxODgtNGQ2OS00YzA4LThlMzAtMmE3YzllNGQ4ODk5',        'origin': 'https://admin.piaoquantv.com',        'referer': 'https://admin.piaoquantv.com/cms/post-detail/19014920/detail',        'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',        'sec-ch-ua-mobile': '?0',        'sec-ch-ua-platform': '"macOS"',        'sec-fetch-dest': 'empty',        'sec-fetch-mode': 'cors',        'sec-fetch-site': 'same-origin',        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'    }    res = requests.request("POST", url, json=payload, headers=headers).json()    # print(res.json())    pool_list = res['content']['list']    for pool in pool_list:        if pool['id'] == -1:            levels = pool['levels']            return levelsdef generate_flow_map():    """    生成 level_id 映射表    :return:    """    try:        levels = generate_flow_pool_levels()        w = {}        for level in levels:            w[str(level['level'])] = level['id']        return w    except:        return {}class AutoSendToFlowPool(object):    """    定时任务方式    自动把优质视频送入流量池高层    方法:        定时从表中读取 待推荐的新视频的数据;        判断条件,满足规则的数据则自动送入流量池高层;    """    def __init__(self):        self.header = {            "authority": "admin.piaoquantv.com",            "accept": "application/json",            "accept-language": "zh,en;q=0.9,zh-CN;q=0.8",            "content-type": "application/json",            "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh",            "origin": "https://admin.piaoquantv.com",            "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail",            "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',            "sec-ch-ua-mobile": "?0",            "sec-ch-ua-platform": '"macOS"',            "sec-fetch-dest": "empty",            "sec-fetch-mode": "cors",            "sec-fetch-site": "same-origin",            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",        }        self.enter_url = "https://api-internal.piaoquantv.com/flowpool/video/enter/abPool"        self.flow_pool_map = {            "1": 24,            "2": 25,            "3": 26,            "4": 27,            "5": 28,            "6": 29        }        self.odp = OdpsFunction()        self.aliyun_log = AliyunLogger(platform="home", mode="automatic")    def get_data_from_flow_pool(self):        """        get recommend videos from MySQL        :return: List [{}, {}, {}......]        """        t = datetime.datetime.now().strftime("%Y%m%d%H")        sql = f"""select videoid, target_level, uid from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';"""        data = self.odp.select(sql)        return data    def send_to_flow_pool_level(self, obj, flow_pool_id=-1):        """        send video to different flow pool levels        :param obj: {            "videoid": video id,            "target_level": flow pool level, ranges from 1 to 4        }        :param flow_pool_id: flow pool id, define -1        :return: bool, False / True        """        pool_map = generate_flow_map()        if not pool_map:            pool_map = self.flow_pool_map        level = str(obj["target_level"])        level_id = pool_map[level]        o = {            "flowPoolId": flow_pool_id,            "startType": 2,            "uid": int(obj['uid']),            "videoId": int(obj["videoid"]),            "flowPoolLevelId": level_id        }        self.aliyun_log.logging(            code="4003",            message="请求地址: {}".format(self.enter_url),            data=o        )        response = requests.request("POST", self.enter_url, headers=self.header, json=o)        if response.json()['code'] == 0:            self.aliyun_log.logging(                code="4003",                message="返回",                data=response.json()            )            self.aliyun_log.logging(                code="4001", message="有一条视频成功送入流量池", data={"vid": obj['videoid'], "level": level_id}            )            return True        else:            self.aliyun_log.logging(                code="4004",                message=response.json()['msg'] if response.json().get('msg') else "操作失败",                data=response.json(),                account=int(obj["videoid"])            )            return False    def auto_process(self):        """        auto process this task in schedule        :return:        """        successful_list = []        for i in range(20):            data = self.get_data_from_flow_pool()            if data:                self.aliyun_log.logging(                    code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)),                    data=[item['videoid'] for item in data]                )                for obj in data:                    if self.send_to_flow_pool_level(obj):                        successful_list.append(obj['videoid'])                # robot send message to the group, notice the successful count and the fail count                if successful_list:                    bot(successful_list)                return            else:                self.aliyun_log.logging(                    code=4006,                    message="未扫描到数据, 等待 60s * {}".format(i + 1)                )                time.sleep(60 * 2)        # 本小时没有扫描到数据if __name__ == "__main__":    AS = AutoSendToFlowPool()    schedule.every().hour.at(":08").do(AS.auto_process)    while True:        schedule.run_pending()        time.sleep(10)
 |