auto_send_to_flow_pool.py 8.2 KB


  1. """
  2. created on Thursday, February,29 2024
  3. @author: luojunhui
  4. """
  5. import os
  6. import sys
  7. import json
  8. import time
  9. import requests
  10. import datetime
  11. import schedule
  12. from odps import ODPS
  13. sys.path.append(os.getcwd())
  14. from application.common.log import AliyunLogger
  15. def bot(success_list):
  16. """
  17. 机器人
  18. """
  19. url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49"
  20. headers = {"Content-Type": "application/json"}
  21. payload = {
  22. "msg_type": "interactive",
  23. "card": {
  24. "elements": [
  25. {
  26. "tag": "div",
  27. "text": {
  28. "content": "**成功送入流量池的视频一共{}条**\n{}".format(
  29. len(success_list), success_list
  30. ),
  31. "tag": "lark_md",
  32. },
  33. },
  34. ],
  35. "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}},
  36. },
  37. }
  38. requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
  39. class OdpsFunction(object):
  40. """
  41. odps function class
  42. """
  43. def __init__(self):
  44. self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
  45. self.access_id = "LTAIWYUujJAm7CbH"
  46. self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  47. self.project = "loghubods"
  48. self.od = ODPS(
  49. access_id=self.access_id,
  50. secret_access_key=self.access_key,
  51. endpoint=self.endpoint,
  52. project=self.project,
  53. )
  54. def select(self, sql):
  55. """
  56. :param sql: 查询语句
  57. :return: odps_obj{}
  58. """
  59. with self.od.execute_sql(sql).open_reader() as reader:
  60. if reader:
  61. return [item for item in reader]
  62. else:
  63. return []
  64. def generate_flow_pool_levels():
  65. """
  66. 读取流量池 level_id表并且通过流量池层级来生成 id
  67. :return:
  68. """
  69. url = "https://admin.piaoquantv.com/manager/flowpool/pageList"
  70. payload = {
  71. "pageNum": 1,
  72. "pageSize": 10000
  73. }
  74. headers = {
  75. 'authority': 'admin.piaoquantv.com',
  76. 'accept': 'application/json',
  77. 'accept-language': 'en,zh;q=0.9',
  78. 'content-type': 'application/json',
  79. 'cookie': 'SESSION=ZDY1MTYxODgtNGQ2OS00YzA4LThlMzAtMmE3YzllNGQ4ODk5',
  80. 'origin': 'https://admin.piaoquantv.com',
  81. 'referer': 'https://admin.piaoquantv.com/cms/post-detail/19014920/detail',
  82. 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
  83. 'sec-ch-ua-mobile': '?0',
  84. 'sec-ch-ua-platform': '"macOS"',
  85. 'sec-fetch-dest': 'empty',
  86. 'sec-fetch-mode': 'cors',
  87. 'sec-fetch-site': 'same-origin',
  88. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
  89. }
  90. res = requests.request("POST", url, json=payload, headers=headers).json()
  91. # print(res.json())
  92. pool_list = res['content']['list']
  93. for pool in pool_list:
  94. if pool['id'] == -1:
  95. levels = pool['levels']
  96. return levels
  97. def generate_flow_map():
  98. """
  99. 生成 level_id 映射表
  100. :return:
  101. """
  102. try:
  103. levels = generate_flow_pool_levels()
  104. w = {}
  105. for level in levels:
  106. w[str(level['level'])] = level['id']
  107. return w
  108. except:
  109. return {}
  110. class AutoSendToFlowPool(object):
  111. """
  112. 定时任务方式
  113. 自动把优质视频送入流量池高层
  114. 方法:
  115. 定时从表中读取 待推荐的新视频的数据;
  116. 判断条件,满足规则的数据则自动送入流量池高层;
  117. """
  118. def __init__(self):
  119. self.header = {
  120. "authority": "admin.piaoquantv.com",
  121. "accept": "application/json",
  122. "accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
  123. "content-type": "application/json",
  124. "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh",
  125. "origin": "https://admin.piaoquantv.com",
  126. "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail",
  127. "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
  128. "sec-ch-ua-mobile": "?0",
  129. "sec-ch-ua-platform": '"macOS"',
  130. "sec-fetch-dest": "empty",
  131. "sec-fetch-mode": "cors",
  132. "sec-fetch-site": "same-origin",
  133. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
  134. }
  135. self.enter_url = "https://api-internal.piaoquantv.com/flowpool/video/enter/abPool"
  136. self.flow_pool_map = {
  137. "1": 24,
  138. "2": 25,
  139. "3": 26,
  140. "4": 27,
  141. "5": 28,
  142. "6": 29
  143. }
  144. self.odp = OdpsFunction()
  145. self.aliyun_log = AliyunLogger(platform="home", mode="automatic")
  146. def get_data_from_flow_pool(self):
  147. """
  148. get recommend videos from MySQL
  149. :return: List [{}, {}, {}......]
  150. """
  151. t = datetime.datetime.now().strftime("%Y%m%d%H")
  152. sql = f"""select videoid, target_level, uid from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';"""
  153. data = self.odp.select(sql)
  154. return data
  155. def send_to_flow_pool_level(self, obj, flow_pool_id=-1):
  156. """
  157. send video to different flow pool levels
  158. :param obj: {
  159. "videoid": video id,
  160. "target_level": flow pool level, ranges from 1 to 4
  161. }
  162. :param flow_pool_id: flow pool id, define -1
  163. :return: bool, False / True
  164. """
  165. pool_map = generate_flow_map()
  166. if not pool_map:
  167. pool_map = self.flow_pool_map
  168. level = str(obj["target_level"])
  169. level_id = pool_map[level]
  170. o = {
  171. "flowPoolId": flow_pool_id,
  172. "startType": 2,
  173. "uid": int(obj['uid']),
  174. "videoId": int(obj["videoid"]),
  175. "flowPoolLevelId": level_id
  176. }
  177. self.aliyun_log.logging(
  178. code="4003",
  179. message="请求地址: {}".format(self.enter_url),
  180. data=o
  181. )
  182. response = requests.request("POST", self.enter_url, headers=self.header, json=o)
  183. if response.json()['code'] == 0:
  184. self.aliyun_log.logging(
  185. code="4003",
  186. message="返回",
  187. data=response.json()
  188. )
  189. self.aliyun_log.logging(
  190. code="4001", message="有一条视频成功送入流量池", data={"vid": obj['videoid'], "level": level_id}
  191. )
  192. return True
  193. else:
  194. self.aliyun_log.logging(
  195. code="4004",
  196. message=response.json()['msg'] if response.json().get('msg') else "操作失败",
  197. data=response.json(),
  198. account=int(obj["videoid"])
  199. )
  200. return False
  201. def auto_process(self):
  202. """
  203. auto process this task in schedule
  204. :return:
  205. """
  206. successful_list = []
  207. for i in range(20):
  208. data = self.get_data_from_flow_pool()
  209. if data:
  210. self.aliyun_log.logging(
  211. code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)),
  212. data=[item['videoid'] for item in data]
  213. )
  214. for obj in data:
  215. if self.send_to_flow_pool_level(obj):
  216. successful_list.append(obj['videoid'])
  217. # robot send message to the group, notice the successful count and the fail count
  218. if successful_list:
  219. bot(successful_list)
  220. return
  221. else:
  222. self.aliyun_log.logging(
  223. code=4006,
  224. message="未扫描到数据, 等待 60s * {}".format(i + 1)
  225. )
  226. time.sleep(60 * 2)
  227. # 本小时没有扫描到数据
  228. if __name__ == "__main__":
  229. AS = AutoSendToFlowPool()
  230. schedule.every().hour.at(":08").do(AS.auto_process)
  231. while True:
  232. schedule.run_pending()
  233. time.sleep(10)