auto_send_to_flow_pool.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. created on Thursday, February,29 2024
  3. @author: luojunhui
  4. """
  5. import json
  6. import time
  7. import requests
  8. import datetime
  9. import schedule
  10. from odps import ODPS
  11. from application.common.log import AliyunLogger
  12. def bot(success_list):
  13. """
  14. 机器人
  15. """
  16. url = "https://open.feishu.cn/open-apis/bot/v2/hook/641c6849-8bdc-4781-a79e-deb4af50fa49"
  17. headers = {"Content-Type": "application/json"}
  18. payload = {
  19. "msg_type": "interactive",
  20. "card": {
  21. "elements": [
  22. {
  23. "tag": "div",
  24. "text": {
  25. "content": "**成功送入流量池的视频一共{}条**\n{}".format(
  26. len(success_list), success_list
  27. ),
  28. "tag": "lark_md",
  29. },
  30. },
  31. ],
  32. "header": {"title": {"content": "自动送入流量池: 通知 ✅", "tag": "plain_text"}},
  33. },
  34. }
  35. w = requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
  36. print(w.json())
  37. class OdpsFunction(object):
  38. """
  39. odps function class
  40. """
  41. def __init__(self):
  42. self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
  43. self.access_id = "LTAIWYUujJAm7CbH"
  44. self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  45. self.project = "loghubods"
  46. self.od = ODPS(
  47. access_id=self.access_id,
  48. secret_access_key=self.access_key,
  49. endpoint=self.endpoint,
  50. project=self.project,
  51. )
  52. def select(self, sql):
  53. """
  54. :param sql: 查询语句
  55. :return: odps_obj{}
  56. """
  57. with self.od.execute_sql(sql).open_reader() as reader:
  58. if reader:
  59. return [item for item in reader]
  60. else:
  61. return []
  62. class AutoSendToFlowPool(object):
  63. """
  64. 定时任务方式
  65. 自动把优质视频送入流量池高层
  66. 方法:
  67. 定时从表中读取 待推荐的新视频的数据;
  68. 判断条件,满足规则的数据则自动送入流量池高层;
  69. """
  70. def __init__(self):
  71. self.header = {
  72. "authority": "admin.piaoquantv.com",
  73. "accept": "application/json",
  74. "accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
  75. "content-type": "application/json",
  76. "cookie": "SESSION=NTk4MWQ5MDItZTI1ZS00YjI1LTllN2MtNThiY2M4MjhiZjVh",
  77. "origin": "https://admin.piaoquantv.com",
  78. "referer": "https://admin.piaoquantv.com/cms/post-detail/18811646/detail",
  79. "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
  80. "sec-ch-ua-mobile": "?0",
  81. "sec-ch-ua-platform": '"macOS"',
  82. "sec-fetch-dest": "empty",
  83. "sec-fetch-mode": "cors",
  84. "sec-fetch-site": "same-origin",
  85. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
  86. }
  87. self.enter_url = "https://admin.piaoquantv.com/manager/flowpool/video/enter"
  88. self.flow_pool_map = {
  89. 1: [1, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78],
  90. 2: [2, 25, 31, 37, 43, 49, 55, 61, 67, 73, 79],
  91. 3: [3, 26, 32, 38, 44, 50, 56, 62, 68, 74, 80],
  92. 4: [4, 27, 33, 39, 45, 51, 57, 63, 69, 75, 81],
  93. 5: [22, 28, 34, 40, 46, 52, 58, 64, 70, 76, 82],
  94. 6: [23, 29, 35, 41, 47, 53, 59, 65, 71, 77, 83],
  95. }
  96. self.odp = OdpsFunction()
  97. self.aliyun_log = AliyunLogger(platform="home", mode="automatic")
  98. def get_data_from_flow_pool(self):
  99. """
  100. get recommend videos from MySQL
  101. :return: List [{}, {}, {}......]
  102. """
  103. t = datetime.datetime.now().strftime("%Y%m%d%H")
  104. sql = f"""select videoid, target_level from loghubods.auto_enter_flow_pool_videolist where dt = '{t}';"""
  105. data = self.odp.select(sql)
  106. return data
  107. def send_to_flow_pool_level(self, obj, flow_pool_id=-1):
  108. """
  109. send video to different flow pool levels
  110. :param obj: {
  111. "videoid": video id,
  112. "target_level": flow pool level, ranges from 1 to 4
  113. }
  114. :param flow_pool_id: flow pool id, define -1
  115. :return: bool, False / True
  116. """
  117. o = {
  118. "flowPoolId": flow_pool_id,
  119. "startType": 0, # 1 human / 0 machine
  120. "videoId": obj["videoid"],
  121. "flowPoolLevelId": obj["target_level"],
  122. }
  123. response = requests.request("POST", self.enter_url, headers=self.header, json=o)
  124. if response.json()['code'] == 0:
  125. return True
  126. else:
  127. self.aliyun_log.logging(
  128. code="4004",
  129. message=response.json()['msg'] if response.json().get('msg') else "操作失败",
  130. data=response.json()
  131. )
  132. return False
  133. def auto_process(self):
  134. """
  135. auto process this task in schedule
  136. :return:
  137. """
  138. successful_list = []
  139. data = self.get_data_from_flow_pool()
  140. if data:
  141. self.aliyun_log.logging(
  142. code=4001, message="该小时一共有 {} 条视频需要被送入流量池".format(len(data)), data=data
  143. )
  144. for obj in data:
  145. if self.send_to_flow_pool_level(obj):
  146. successful_list.append(obj['videoid'])
  147. self.aliyun_log.logging(
  148. code="4001", message="有一条视频成功送入流量池", data=obj
  149. )
  150. # robot send message to the group, notice the successful count and the fail count
  151. if successful_list:
  152. bot(successful_list)
  153. else:
  154. # robot notice no videos found this hour
  155. return
  156. if __name__ == "__main__":
  157. AS = AutoSendToFlowPool()
  158. schedule.every().hour.at(":20").do(AS.auto_process)
  159. while True:
  160. schedule.run_pending()
  161. time.sleep(10)