123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- """
- created on Thursday, February,29 2024
- @author: luojunhui
- """
- import os
- import sys
- import json
- import time
- import requests
- import datetime
- import schedule
- from odps import ODPS
- sys.path.append(os.getcwd())
- from application.common.log import AliyunLogger
- def bot(success_list):
- """
- 机器人
- """
- url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49"
- headers = {"Content-Type": "application/json"}
- payload = {
- "msg_type": "interactive",
- "card": {
- "elements": [
- {
- "tag": "div",
- "text": {
- "content": "**成功送入流量池的视频一共{}条**\n{}".format(
- len(success_list), success_list
- ),
- "tag": "lark_md",
- },
- },
- ],
- "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}},
- },
- }
- requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
- class OdpsFunction(object):
- """
- odps function class
- """
- def __init__(self):
- self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
- self.access_id = "LTAIWYUujJAm7CbH"
- self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- self.project = "loghubods"
- self.od = ODPS(
- access_id=self.access_id,
- secret_access_key=self.access_key,
- endpoint=self.endpoint,
- project=self.project,
- )
- def select(self, sql):
- """
- :param sql: 查询语句
- :return: odps_obj{}
- """
- with self.od.execute_sql(sql).open_reader() as reader:
- if reader:
- return [item for item in reader]
- else:
- return []
- def generate_flow_pool_levels():
- """
- 读取流量池 level_id表并且通过流量池层级来生成 id
- :return:
- """
- url = "https://admin.piaoquantv.com/manager/flowpool/pageList"
- payload = {
- "pageNum": 1,
- "pageSize": 10000
- }
- headers = {
- 'authority': 'admin.piaoquantv.com',
- 'accept': 'application/json',
- 'accept-language': 'en,zh;q=0.9',
- 'content-type': 'application/json',
- 'cookie': 'SESSION=ZDY1MTYxODgtNGQ2OS00YzA4LThlMzAtMmE3YzllNGQ4ODk5',
- 'origin': 'https://admin.piaoquantv.com',
- 'referer': 'https://admin.piaoquantv.com/cms/post-detail/19014920/detail',
- 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'same-origin',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
- }
- res = requests.request("POST", url, json=payload, headers=headers).json()
- # print(res.json())
- pool_list = res['content']['list']
- for pool in pool_list:
- if pool['id'] == -1:
- levels = pool['levels']
- return levels
- def generate_flow_map():
- """
- 生成 level_id 映射表
- :return:
- """
- try:
- levels = generate_flow_pool_levels()
- w = {}
- for level in levels:
- w[str(level['level'])] = level['id']
- return w
- except:
- return {}
- class AutoSendToFlowPool(object):
- """
- 定时任务方式
- 自动把优质视频送入流量池高层
- 方法:
- 定时从表中读取 待推荐的新视频的数据;
- 判断条件,满足规则的数据则自动送入流量池高层;
- """
- def __init__(self):
- self.header = {
- "authority": "admin.piaoquantv.com",
- "accept": "application/json",
- "accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
- "content-type": "application/json",
- "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh",
- "origin": "https://admin.piaoquantv.com",
- "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail",
- "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"macOS"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-origin",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
- }
- self.enter_url = "https://api-internal.piaoquantv.com/flowpool/video/enter/abPool"
- self.flow_pool_map = {
- "1": 24,
- "2": 25,
- "3": 26,
- "4": 27,
- "5": 28,
- "6": 29
- }
- self.odp = OdpsFunction()
- self.aliyun_log = AliyunLogger(platform="home", mode="automatic")
- def get_data_from_flow_pool(self):
- """
- get recommend videos from MySQL
- :return: List [{}, {}, {}......]
- """
- t = datetime.datetime.now().strftime("%Y%m%d%H")
- sql = f"""select videoid, target_level, uid from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';"""
- data = self.odp.select(sql)
- return data
- def send_to_flow_pool_level(self, obj, flow_pool_id=-1):
- """
- send video to different flow pool levels
- :param obj: {
- "videoid": video id,
- "target_level": flow pool level, ranges from 1 to 4
- }
- :param flow_pool_id: flow pool id, define -1
- :return: bool, False / True
- """
- pool_map = generate_flow_map()
- if not pool_map:
- pool_map = self.flow_pool_map
- level = str(obj["target_level"])
- level_id = pool_map[level]
- o = {
- "flowPoolId": flow_pool_id,
- "startType": 2,
- "uid": int(obj['uid']),
- "videoId": int(obj["videoid"]),
- "flowPoolLevelId": level_id
- }
- self.aliyun_log.logging(
- code="4003",
- message="请求地址: {}".format(self.enter_url),
- data=o
- )
- response = requests.request("POST", self.enter_url, headers=self.header, json=o)
- if response.json()['code'] == 0:
- self.aliyun_log.logging(
- code="4003",
- message="返回",
- data=response.json()
- )
- self.aliyun_log.logging(
- code="4001", message="有一条视频成功送入流量池", data={"vid": obj['videoid'], "level": level_id}
- )
- return True
- else:
- self.aliyun_log.logging(
- code="4004",
- message=response.json()['msg'] if response.json().get('msg') else "操作失败",
- data=response.json(),
- account=int(obj["videoid"])
- )
- return False
- def auto_process(self):
- """
- auto process this task in schedule
- :return:
- """
- successful_list = []
- for i in range(20):
- data = self.get_data_from_flow_pool()
- if data:
- self.aliyun_log.logging(
- code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)),
- data=[item['videoid'] for item in data]
- )
- for obj in data:
- if self.send_to_flow_pool_level(obj):
- successful_list.append(obj['videoid'])
- # robot send message to the group, notice the successful count and the fail count
- if successful_list:
- bot(successful_list)
- return
- else:
- self.aliyun_log.logging(
- code=4006,
- message="未扫描到数据, 等待 60s * {}".format(i + 1)
- )
- time.sleep(60 * 2)
- # 本小时没有扫描到数据
- if __name__ == "__main__":
- AS = AutoSendToFlowPool()
- schedule.every().hour.at(":08").do(AS.auto_process)
- while True:
- schedule.run_pending()
- time.sleep(10)
|