auto_send_to_flow_pool.py 8.2 KB


  1. """
  2. created on Thursday, February,29 2024
  3. @author: luojunhui
  4. """
  5. import json
  6. import time
  7. import requests
  8. import datetime
  9. import schedule
  10. from odps import ODPS
  11. from application.common.log import AliyunLogger
  12. def bot(success_list):
  13. """
  14. 机器人
  15. """
  16. url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49"
  17. headers = {"Content-Type": "application/json"}
  18. payload = {
  19. "msg_type": "interactive",
  20. "card": {
  21. "elements": [
  22. {
  23. "tag": "div",
  24. "text": {
  25. "content": "**成功送入流量池的视频一共{}条**\n{}".format(
  26. len(success_list), success_list
  27. ),
  28. "tag": "lark_md",
  29. },
  30. },
  31. ],
  32. "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}},
  33. },
  34. }
  35. requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
  36. class OdpsFunction(object):
  37. """
  38. odps function class
  39. """
  40. def __init__(self):
  41. self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
  42. self.access_id = "LTAIWYUujJAm7CbH"
  43. self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  44. self.project = "loghubods"
  45. self.od = ODPS(
  46. access_id=self.access_id,
  47. secret_access_key=self.access_key,
  48. endpoint=self.endpoint,
  49. project=self.project,
  50. )
  51. def select(self, sql):
  52. """
  53. :param sql: 查询语句
  54. :return: odps_obj{}
  55. """
  56. with self.od.execute_sql(sql).open_reader() as reader:
  57. if reader:
  58. return [item for item in reader]
  59. else:
  60. return []
  61. def generate_flow_pool_levels():
  62. """
  63. 读取流量池 level_id表并且通过流量池层级来生成 id
  64. :return:
  65. """
  66. url = "https://admin.piaoquantv.com/manager/flowpool/pageList"
  67. payload = {
  68. "pageNum": 1,
  69. "pageSize": 10000
  70. }
  71. headers = {
  72. 'authority': 'admin.piaoquantv.com',
  73. 'accept': 'application/json',
  74. 'accept-language': 'en,zh;q=0.9',
  75. 'content-type': 'application/json',
  76. 'cookie': 'SESSION=ZDY1MTYxODgtNGQ2OS00YzA4LThlMzAtMmE3YzllNGQ4ODk5',
  77. 'origin': 'https://admin.piaoquantv.com',
  78. 'referer': 'https://admin.piaoquantv.com/cms/post-detail/19014920/detail',
  79. 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
  80. 'sec-ch-ua-mobile': '?0',
  81. 'sec-ch-ua-platform': '"macOS"',
  82. 'sec-fetch-dest': 'empty',
  83. 'sec-fetch-mode': 'cors',
  84. 'sec-fetch-site': 'same-origin',
  85. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
  86. }
  87. res = requests.request("POST", url, json=payload, headers=headers).json()
  88. # print(res.json())
  89. pool_list = res['content']['list']
  90. for pool in pool_list:
  91. if pool['id'] == -1:
  92. levels = pool['levels']
  93. return levels
  94. def generate_flow_map():
  95. """
  96. 生成 level_id 映射表
  97. :return:
  98. """
  99. try:
  100. levels = generate_flow_pool_levels()
  101. w = {}
  102. for level in levels:
  103. w[str(level['level'])] = level['id']
  104. return w
  105. except:
  106. return {}
  107. class AutoSendToFlowPool(object):
  108. """
  109. 定时任务方式
  110. 自动把优质视频送入流量池高层
  111. 方法:
  112. 定时从表中读取 待推荐的新视频的数据;
  113. 判断条件,满足规则的数据则自动送入流量池高层;
  114. """
  115. def __init__(self):
  116. self.header = {
  117. "authority": "admin.piaoquantv.com",
  118. "accept": "application/json",
  119. "accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
  120. "content-type": "application/json",
  121. "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh",
  122. "origin": "https://admin.piaoquantv.com",
  123. "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail",
  124. "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
  125. "sec-ch-ua-mobile": "?0",
  126. "sec-ch-ua-platform": '"macOS"',
  127. "sec-fetch-dest": "empty",
  128. "sec-fetch-mode": "cors",
  129. "sec-fetch-site": "same-origin",
  130. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
  131. }
  132. self.enter_url = "https://api-internal.piaoquantv.com/flowpool/video/enter/abPool"
  133. self.flow_pool_map = {
  134. "1": 24,
  135. "2": 25,
  136. "3": 26,
  137. "4": 27,
  138. "5": 28,
  139. "6": 29
  140. }
  141. self.odp = OdpsFunction()
  142. self.aliyun_log = AliyunLogger(platform="home", mode="automatic")
  143. def get_data_from_flow_pool(self):
  144. """
  145. get recommend videos from MySQL
  146. :return: List [{}, {}, {}......]
  147. """
  148. t = datetime.datetime.now().strftime("%Y%m%d%H")
  149. sql = f"""select videoid, target_level, uid from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';"""
  150. data = self.odp.select(sql)
  151. return data
  152. def send_to_flow_pool_level(self, obj, flow_pool_id=-1):
  153. """
  154. send video to different flow pool levels
  155. :param obj: {
  156. "videoid": video id,
  157. "target_level": flow pool level, ranges from 1 to 4
  158. }
  159. :param flow_pool_id: flow pool id, define -1
  160. :return: bool, False / True
  161. """
  162. pool_map = generate_flow_map()
  163. if not pool_map:
  164. pool_map = self.flow_pool_map
  165. level = str(obj["target_level"])
  166. level_id = pool_map[level]
  167. o = {
  168. "flowPoolId": flow_pool_id,
  169. "startType": 2,
  170. "uid": int(obj['uid']),
  171. "videoId": int(obj["videoid"]),
  172. "flowPoolLevelId": level_id
  173. }
  174. self.aliyun_log.logging(
  175. code="4003",
  176. message="请求地址: {}".format(self.enter_url),
  177. data=o
  178. )
  179. response = requests.request("POST", self.enter_url, headers=self.header, json=o)
  180. if response.json()['code'] == 0:
  181. self.aliyun_log.logging(
  182. code="4003",
  183. message="返回",
  184. data=response.json()
  185. )
  186. self.aliyun_log.logging(
  187. code="4001", message="有一条视频成功送入流量池", data={"vid": obj['videoid'], "level": level_id}
  188. )
  189. return True
  190. else:
  191. self.aliyun_log.logging(
  192. code="4004",
  193. message=response.json()['msg'] if response.json().get('msg') else "操作失败",
  194. data=response.json(),
  195. account=int(obj["videoid"])
  196. )
  197. return False
  198. def auto_process(self):
  199. """
  200. auto process this task in schedule
  201. :return:
  202. """
  203. successful_list = []
  204. for i in range(20):
  205. data = self.get_data_from_flow_pool()
  206. if data:
  207. self.aliyun_log.logging(
  208. code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)),
  209. data=[item['videoid'] for item in data]
  210. )
  211. for obj in data:
  212. if self.send_to_flow_pool_level(obj):
  213. successful_list.append(obj['videoid'])
  214. # robot send message to the group, notice the successful count and the fail count
  215. if successful_list:
  216. bot(successful_list)
  217. return
  218. else:
  219. self.aliyun_log.logging(
  220. code=4006,
  221. message="未扫描到数据, 等待 60s * {}".format(i + 1)
  222. )
  223. time.sleep(60)
  224. # 本小时没有扫描到数据
  225. if __name__ == "__main__":
  226. AS = AutoSendToFlowPool()
  227. schedule.every().hour.at(":08").do(AS.auto_process)
  228. while True:
  229. schedule.run_pending()
  230. time.sleep(10)