""" created on Thursday, February,29 2024 @author: luojunhui """ import os import sys import json import time import requests import datetime import schedule from odps import ODPS sys.path.append(os.getcwd()) from application.common.log import AliyunLogger def bot(success_list): """ 机器人 """ url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49" headers = {"Content-Type": "application/json"} payload = { "msg_type": "interactive", "card": { "elements": [ { "tag": "div", "text": { "content": "**成功送入流量池的视频一共{}条**\n{}".format( len(success_list), success_list ), "tag": "lark_md", }, }, ], "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}}, }, } requests.request("POST", url=url, headers=headers, data=json.dumps(payload)) class OdpsFunction(object): """ odps function class """ def __init__(self): self.endpoint = "http://service.cn.maxcompute.aliyun.com/api" self.access_id = "LTAIWYUujJAm7CbH" self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" self.project = "loghubods" self.od = ODPS( access_id=self.access_id, secret_access_key=self.access_key, endpoint=self.endpoint, project=self.project, ) def select(self, sql): """ :param sql: 查询语句 :return: odps_obj{} """ with self.od.execute_sql(sql).open_reader() as reader: if reader: return [item for item in reader] else: return [] def generate_flow_pool_levels(): """ 读取流量池 level_id表并且通过流量池层级来生成 id :return: """ url = "https://admin.piaoquantv.com/manager/flowpool/pageList" payload = { "pageNum": 1, "pageSize": 10000 } headers = { 'authority': 'admin.piaoquantv.com', 'accept': 'application/json', 'accept-language': 'en,zh;q=0.9', 'content-type': 'application/json', 'cookie': 'SESSION=ZDY1MTYxODgtNGQ2OS00YzA4LThlMzAtMmE3YzllNGQ4ODk5', 'origin': 'https://admin.piaoquantv.com', 'referer': 'https://admin.piaoquantv.com/cms/post-detail/19014920/detail', 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36' } res = requests.request("POST", url, json=payload, headers=headers).json() # print(res.json()) pool_list = res['content']['list'] for pool in pool_list: if pool['id'] == -1: levels = pool['levels'] return levels def generate_flow_map(): """ 生成 level_id 映射表 :return: """ try: levels = generate_flow_pool_levels() w = {} for level in levels: w[str(level['level'])] = level['id'] return w except: return {} class AutoSendToFlowPool(object): """ 定时任务方式 自动把优质视频送入流量池高层 方法: 定时从表中读取 待推荐的新视频的数据; 判断条件,满足规则的数据则自动送入流量池高层; """ def __init__(self): self.header = { "authority": "admin.piaoquantv.com", "accept": "application/json", "accept-language": "zh,en;q=0.9,zh-CN;q=0.8", "content-type": "application/json", "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh", "origin": "https://admin.piaoquantv.com", "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail", "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", } self.enter_url = "https://api-internal.piaoquantv.com/flowpool/video/enter/abPool" self.flow_pool_map = { "1": 24, "2": 25, "3": 26, "4": 27, "5": 28, "6": 29 } self.odp = OdpsFunction() self.aliyun_log = AliyunLogger(platform="home", mode="automatic") def get_data_from_flow_pool(self): """ get recommend videos from MySQL :return: List [{}, {}, {}......] """ t = datetime.datetime.now().strftime("%Y%m%d%H") sql = f"""select videoid, target_level, uid from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';""" data = self.odp.select(sql) return data def send_to_flow_pool_level(self, obj, flow_pool_id=-1): """ send video to different flow pool levels :param obj: { "videoid": video id, "target_level": flow pool level, ranges from 1 to 4 } :param flow_pool_id: flow pool id, define -1 :return: bool, False / True """ pool_map = generate_flow_map() if not pool_map: pool_map = self.flow_pool_map level = str(obj["target_level"]) level_id = pool_map[level] o = { "flowPoolId": flow_pool_id, "startType": 2, "uid": int(obj['uid']), "videoId": int(obj["videoid"]), "flowPoolLevelId": level_id } self.aliyun_log.logging( code="4003", message="请求地址: {}".format(self.enter_url), data=o ) response = requests.request("POST", self.enter_url, headers=self.header, json=o) if response.json()['code'] == 0: self.aliyun_log.logging( code="4003", message="返回", data=response.json() ) self.aliyun_log.logging( code="4001", message="有一条视频成功送入流量池", data={"vid": obj['videoid'], "level": level_id} ) return True else: self.aliyun_log.logging( code="4004", message=response.json()['msg'] if response.json().get('msg') else "操作失败", data=response.json(), account=int(obj["videoid"]) ) return False def auto_process(self): """ auto process this task in schedule :return: """ successful_list = [] for i in range(20): data = self.get_data_from_flow_pool() if data: self.aliyun_log.logging( code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)), data=[item['videoid'] for item in data] ) for obj in data: if self.send_to_flow_pool_level(obj): successful_list.append(obj['videoid']) # robot send message to the group, notice the successful count and the fail count if successful_list: bot(successful_list) return else: self.aliyun_log.logging( code=4006, message="未扫描到数据, 等待 60s * {}".format(i + 1) ) time.sleep(60 * 2) # 本小时没有扫描到数据 if __name__ == "__main__": AS = AutoSendToFlowPool() schedule.every().hour.at(":08").do(AS.auto_process) while True: schedule.run_pending() time.sleep(10)