""" created on Thursday, February,29 2024 @author: luojunhui """ import json import time import requests import datetime import schedule from odps import ODPS from application.common.log import AliyunLogger def bot(success_list): """ 机器人 """ url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49" headers = {"Content-Type": "application/json"} payload = { "msg_type": "interactive", "card": { "elements": [ { "tag": "div", "text": { "content": "**成功送入流量池的视频一共{}条**\n{}".format( len(success_list), success_list ), "tag": "lark_md", }, }, ], "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}}, }, } w = requests.request("POST", url=url, headers=headers, data=json.dumps(payload)) print(w.json()) class OdpsFunction(object): """ odps function class """ def __init__(self): self.endpoint = "http://service.cn.maxcompute.aliyun.com/api" self.access_id = "LTAIWYUujJAm7CbH" self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" self.project = "loghubods" self.od = ODPS( access_id=self.access_id, secret_access_key=self.access_key, endpoint=self.endpoint, project=self.project, ) def select(self, sql): """ :param sql: 查询语句 :return: odps_obj{} """ with self.od.execute_sql(sql).open_reader() as reader: if reader: return [item for item in reader] else: return [] class AutoSendToFlowPool(object): """ 定时任务方式 自动把优质视频送入流量池高层 方法: 定时从表中读取 待推荐的新视频的数据; 判断条件,满足规则的数据则自动送入流量池高层; """ def __init__(self): self.header = { "authority": "admin.piaoquantv.com", "accept": "application/json", "accept-language": "zh,en;q=0.9,zh-CN;q=0.8", "content-type": "application/json", "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh", "origin": "https://admin.piaoquantv.com", "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail", "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", } self.enter_url = "https://admin.piaoquantv.com/manager/flowpool/video/enter" self.flow_pool_map = { 1: [1, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78], 2: [2, 25, 31, 37, 43, 49, 55, 61, 67, 73, 79], 3: [3, 26, 32, 38, 44, 50, 56, 62, 68, 74, 80], 4: [4, 27, 33, 39, 45, 51, 57, 63, 69, 75, 81], 5: [22, 28, 34, 40, 46, 52, 58, 64, 70, 76, 82], 6: [23, 29, 35, 41, 47, 53, 59, 65, 71, 77, 83], } self.odp = OdpsFunction() self.aliyun_log = AliyunLogger(platform="home", mode="automatic") def get_data_from_flow_pool(self): """ get recommend videos from MySQL :return: List [{}, {}, {}......] """ t = datetime.datetime.now().strftime("%Y%m%d%H") sql = f"""select videoid, target_level from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';""" data = self.odp.select(sql) return data def send_to_flow_pool_level(self, obj, flow_pool_id=-1): """ send video to different flow pool levels :param obj: { "videoid": video id, "target_level": flow pool level, ranges from 1 to 4 } :param flow_pool_id: flow pool id, define -1 :return: bool, False / True """ o = { "flowPoolId": flow_pool_id, "startType": 0, # 1 human / 0 machine "videoId": obj["videoid"], "flowPoolLevelId": obj["target_level"], } response = requests.request("POST", self.enter_url, headers=self.header, json=o) if response.json()['code'] == 0: return True else: self.aliyun_log.logging( code="4004", message=response.json()['msg'] if response.json().get('msg') else "操作失败", data=response.json() ) return False def auto_process(self): """ auto process this task in schedule :return: """ successful_list = [] data = self.get_data_from_flow_pool() if data: self.aliyun_log.logging( code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)), data=data ) for obj in data: if self.send_to_flow_pool_level(obj): successful_list.append(obj['videoid']) self.aliyun_log.logging( code="4001", message="有一条视频成功送入流量池", data=obj ) # robot send message to the group, notice the successful count and the fail count if successful_list: bot(successful_list) else: # robot notice no videos found this hour return if __name__ == "__main__": AS = AutoSendToFlowPool() schedule.every().hour.at(":20").do(AS.auto_process) while True: schedule.run_pending() time.sleep(10)