utils.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. # coding:utf-8
  2. import pickle
  3. import os
  4. import requests
  5. import json
  6. import traceback
  7. import pandas as pd
  8. from odps import ODPS
  9. from config import set_config
  10. from db_helper import HologresHelper, MysqlHelper, RedisHelper
  11. from log import Log
  12. from collections import defaultdict
  13. config_, env = set_config()
  14. log_ = Log()
  15. def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
  16. pool_maxsize=1000, pool_connections=1000):
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. connect_timeout=connect_timeout,
  23. read_timeout=read_timeout,
  24. pool_maxsize=pool_maxsize,
  25. pool_connections=pool_connections
  26. )
  27. records = odps.execute_sql(sql=sql)
  28. return records
  29. def exe_sql(project, sql, connect_timeout=3000, read_timeout=500000,
  30. pool_maxsize=1000, pool_connections=1000):
  31. odps = ODPS(
  32. access_id=config_.ODPS_CONFIG['ACCESSID'],
  33. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  34. project=project,
  35. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  36. connect_timeout=connect_timeout,
  37. read_timeout=read_timeout,
  38. pool_maxsize=pool_maxsize,
  39. pool_connections=pool_connections
  40. )
  41. with odps.execute_sql(sql).open_reader() as reader:
  42. d = defaultdict(list) #
  43. for record in reader:
  44. for res in record:
  45. d[res[0]].append(res[1]) #
  46. data = pd.DataFrame.from_dict(d, orient='columns', dtype=str) #
  47. return data
  48. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  49. pool_maxsize=1000, pool_connections=1000):
  50. """
  51. 从odps获取数据
  52. :param date: 日期 type-string '%Y%m%d'
  53. :param project: type-string
  54. :param table: 表名 type-string
  55. :param connect_timeout: 连接超时设置
  56. :param read_timeout: 读取超时设置
  57. :param pool_maxsize:
  58. :param pool_connections:
  59. :return: records
  60. """
  61. odps = ODPS(
  62. access_id=config_.ODPS_CONFIG['ACCESSID'],
  63. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  64. project=project,
  65. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  66. connect_timeout=connect_timeout,
  67. read_timeout=read_timeout,
  68. pool_maxsize=pool_maxsize,
  69. pool_connections=pool_connections
  70. )
  71. records = odps.read_table(name=table, partition='dt=%s' % date)
  72. return records
  73. def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
  74. pool_maxsize=1000, pool_connections=1000):
  75. """
  76. 判断表中是否存在这个分区
  77. :param date: 日期 type-string '%Y%m%d'
  78. :param project: type-string
  79. :param table: 表名 type-string
  80. :param connect_timeout: 连接超时设置
  81. :param read_timeout: 读取超时设置
  82. :param pool_maxsize:
  83. :param pool_connections:
  84. :return: records
  85. """
  86. odps = ODPS(
  87. access_id=config_.ODPS_CONFIG['ACCESSID'],
  88. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  89. project=project,
  90. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  91. connect_timeout=connect_timeout,
  92. read_timeout=read_timeout,
  93. pool_maxsize=pool_maxsize,
  94. pool_connections=pool_connections
  95. )
  96. t = odps.get_table(name=table)
  97. return t.exist_partition(partition_spec=f'dt={date}')
  98. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  99. """
  100. 将数据写入pickle文件中
  101. :param data: 数据
  102. :param filename: 写入的文件名
  103. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  104. :return: None
  105. """
  106. if not os.path.exists(filepath):
  107. os.makedirs(filepath)
  108. file = os.path.join(filepath, filename)
  109. with open(file, 'wb') as wf:
  110. pickle.dump(data, wf)
  111. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  112. """
  113. 从pickle文件读取数据
  114. :param filename: 文件名
  115. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  116. :return: data
  117. """
  118. file = os.path.join(filepath, filename)
  119. if not os.path.exists(file):
  120. return None
  121. with open(file, 'rb') as rf:
  122. data = pickle.load(rf)
  123. return data
  124. def send_msg_to_feishu(webhook, key_word, msg_text):
  125. """发送消息到飞书"""
  126. headers = {'Content-Type': 'application/json'}
  127. payload_message = {
  128. "msg_type": "text",
  129. "content": {
  130. "text": '{}: {}'.format(key_word, msg_text)
  131. }
  132. }
  133. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  134. print(response.text)
  135. def send_msg_to_feishu_new(webhook, key_word, title, msg_list):
  136. """发送消息到飞书"""
  137. headers = {'Content-Type': 'application/json'}
  138. content_list = [
  139. [
  140. {
  141. "tag": "text",
  142. "text": msg
  143. }
  144. ]
  145. for msg in msg_list
  146. ]
  147. payload_message = {
  148. "msg_type": "post",
  149. "content": {
  150. "post": {
  151. "zh_cn": {
  152. "title": f"{key_word}: {title}",
  153. "content": content_list,
  154. }
  155. }
  156. }
  157. }
  158. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  159. print(response.text)
  160. def request_post(request_url, request_data=None, **kwargs):
  161. """
  162. post 请求 HTTP接口
  163. :param request_url: 接口URL
  164. :param request_data: 请求参数
  165. :return: res_data json格式
  166. """
  167. try:
  168. response = requests.post(url=request_url, json=request_data, **kwargs)
  169. if response.status_code == 200:
  170. res_data = json.loads(response.text)
  171. return res_data
  172. else:
  173. log_.info(f"response.status_code: {response.status_code}")
  174. return None
  175. except Exception as e:
  176. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  177. send_msg_to_feishu(
  178. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  179. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  180. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  181. )
  182. return None
  183. def request_get(request_url):
  184. """
  185. get 请求 HTTP接口
  186. :param request_url: 接口URL
  187. :return: res_data json格式
  188. """
  189. try:
  190. response = requests.get(url=request_url)
  191. if response.status_code == 200:
  192. res_data = json.loads(response.text)
  193. return res_data
  194. else:
  195. log_.info(f"response.status_code: {response.status_code}")
  196. return None
  197. except Exception as e:
  198. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  199. send_msg_to_feishu(
  200. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  201. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  202. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  203. )
  204. return None
  205. def data_normalization(data):
  206. """
  207. 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
  208. :param data: type-list
  209. :return: normal_data, type-list 归一化后的数据
  210. """
  211. x_max = max(data)
  212. x_min = min(data)
  213. normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
  214. return normal_data
  215. def filter_video_status(video_ids):
  216. """
  217. 对视频状态进行过滤
  218. :param video_ids: 视频id列表 type-list
  219. :return: filtered_videos
  220. """
  221. i = 0
  222. while i < 3:
  223. try:
  224. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  225. video_status_sql = "SELECT t1.id AS 'video_id', " \
  226. "t1.transcode_status AS 'transcoding_status', " \
  227. "t2.audit_status AS 'audit_status', " \
  228. "t2.video_status AS 'open_status', " \
  229. "t2.recommend_status AS 'applet_rec_status', " \
  230. "t2.app_recommend_status AS 'app_rec_status', " \
  231. "t3.charge AS 'payment_status', " \
  232. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  233. "FROM longvideo.wx_video t1 " \
  234. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  235. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  236. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  237. if len(video_ids) == 1:
  238. sql = "SELECT video_id " \
  239. "FROM ({}) " \
  240. "WHERE audit_status = 5 " \
  241. "AND applet_rec_status IN (1, -6) " \
  242. "AND open_status = 1 " \
  243. "AND payment_status = 0 " \
  244. "AND encryption_status != 5 " \
  245. "AND transcoding_status = 3 " \
  246. "AND video_id IN ({});".format(video_status_sql, video_ids[0])
  247. data = mysql_helper.get_data(sql=sql)
  248. else:
  249. data = []
  250. for i in range(len(video_ids) // 200 + 1):
  251. sql = "SELECT video_id " \
  252. "FROM ({}) " \
  253. "WHERE audit_status = 5 " \
  254. "AND applet_rec_status IN (1, -6) " \
  255. "AND open_status = 1 " \
  256. "AND payment_status = 0 " \
  257. "AND encryption_status != 5 " \
  258. "AND transcoding_status = 3 " \
  259. "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
  260. select_res = mysql_helper.get_data(sql=sql)
  261. if select_res is not None:
  262. data += select_res
  263. filtered_videos = [int(temp[0]) for temp in data]
  264. return filtered_videos
  265. except Exception as e:
  266. log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  267. send_msg_to_feishu(
  268. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  269. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  270. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  271. f"retry count: {i}\n"
  272. f"exception: {e}\n"
  273. f"traceback: {traceback.format_exc()}"
  274. )
  275. i += 1
  276. if i == 1:
  277. return video_ids
  278. def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
  279. """
  280. 对视频状态进行过滤
  281. :param video_ids: 视频id列表 type-list
  282. :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
  283. :return: filtered_videos
  284. """
  285. i = 0
  286. while i < 3:
  287. try:
  288. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  289. video_status_sql = "SELECT t1.id AS 'video_id', " \
  290. "t1.transcode_status AS 'transcoding_status', " \
  291. "t2.audit_status AS 'audit_status', " \
  292. "t2.video_status AS 'open_status', " \
  293. "t2.recommend_status AS 'applet_rec_status', " \
  294. "t2.app_recommend_status AS 'app_rec_status', " \
  295. "t3.charge AS 'payment_status', " \
  296. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  297. "FROM longvideo.wx_video t1 " \
  298. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  299. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  300. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  301. if len(video_ids) == 1:
  302. sql = "SELECT video_id " \
  303. "FROM ({}) " \
  304. "WHERE audit_status = 5 " \
  305. "AND applet_rec_status = {} " \
  306. "AND open_status = 1 " \
  307. "AND payment_status = 0 " \
  308. "AND encryption_status != 5 " \
  309. "AND transcoding_status = 3 " \
  310. "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
  311. data = mysql_helper.get_data(sql=sql)
  312. else:
  313. data = []
  314. for i in range(len(video_ids) // 200 + 1):
  315. sql = "SELECT video_id " \
  316. "FROM ({}) " \
  317. "WHERE audit_status = 5 " \
  318. "AND applet_rec_status = {} " \
  319. "AND open_status = 1 " \
  320. "AND payment_status = 0 " \
  321. "AND encryption_status != 5 " \
  322. "AND transcoding_status = 3 " \
  323. "AND video_id IN {};".format(video_status_sql, applet_rec_status,
  324. tuple(video_ids[i*200:(i+1)*200]))
  325. select_res = mysql_helper.get_data(sql=sql)
  326. if select_res is not None:
  327. data += select_res
  328. filtered_videos = [int(temp[0]) for temp in data]
  329. return filtered_videos
  330. except Exception as e:
  331. log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  332. send_msg_to_feishu(
  333. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  334. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  335. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  336. f"retry count: {i}\n"
  337. f"exception: {e}\n"
  338. f"traceback: {traceback.format_exc()}"
  339. )
  340. i += 1
  341. if i == 1:
  342. return video_ids
  343. def filter_video_status_app(video_ids):
  344. """
  345. 对视频状态进行过滤 - app
  346. :param video_ids: 视频id列表 type-list
  347. :return: filtered_videos
  348. """
  349. i = 0
  350. while i < 3:
  351. try:
  352. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  353. video_status_sql = "SELECT t1.id AS 'video_id', " \
  354. "t1.transcode_status AS 'transcoding_status', " \
  355. "t2.app_audit_status AS 'app_audit_status', " \
  356. "t2.original_status AS 'open_status', " \
  357. "t2.recommend_status AS 'applet_rec_status', " \
  358. "t2.app_recommend_status AS 'app_rec_status', " \
  359. "t3.charge AS 'payment_status', " \
  360. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  361. "FROM longvideo.wx_video t1 " \
  362. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  363. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  364. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  365. if len(video_ids) == 1:
  366. sql = "SELECT video_id " \
  367. "FROM ({}) " \
  368. "WHERE app_audit_status = 5 " \
  369. "AND app_rec_status IN (1, -6, 10) " \
  370. "AND open_status = 1 " \
  371. "AND payment_status = 0 " \
  372. "AND encryption_status != 5 " \
  373. "AND transcoding_status = 3 " \
  374. "AND video_id IN ({});".format(video_status_sql, video_ids[0])
  375. data = mysql_helper.get_data(sql=sql)
  376. else:
  377. data = []
  378. for i in range(len(video_ids) // 200 + 1):
  379. sql = "SELECT video_id " \
  380. "FROM ({}) " \
  381. "WHERE app_audit_status = 5 " \
  382. "AND app_rec_status IN (1, -6, 10) " \
  383. "AND open_status = 1 " \
  384. "AND payment_status = 0 " \
  385. "AND encryption_status != 5 " \
  386. "AND transcoding_status = 3 " \
  387. "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
  388. select_res = mysql_helper.get_data(sql=sql)
  389. if select_res is not None:
  390. data += select_res
  391. filtered_videos = [int(temp[0]) for temp in data]
  392. return filtered_videos
  393. except Exception as e:
  394. log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  395. send_msg_to_feishu(
  396. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  397. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  398. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  399. f"retry count: {i}\n"
  400. f"exception: {e}\n"
  401. f"traceback: {traceback.format_exc()}"
  402. )
  403. i += 1
  404. if i == 1:
  405. return video_ids
  406. def filter_shield_video(video_ids, shield_key_name_list):
  407. """
  408. 过滤屏蔽视频视频
  409. :param video_ids: 需过滤的视频列表 type-list
  410. :param shield_key_name_list: 过滤视频 redis-key
  411. :return: filtered_videos 过滤后的列表 type-list
  412. """
  413. if len(video_ids) == 0:
  414. return video_ids
  415. # 根据Redis缓存中的数据过滤
  416. redis_helper = RedisHelper()
  417. for shield_key_name in shield_key_name_list:
  418. shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  419. if not shield_videos_list:
  420. continue
  421. shield_videos = [int(video) for video in shield_videos_list]
  422. video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  423. return video_ids
  424. def filter_political_videos(video_ids):
  425. """
  426. 过滤涉政视频
  427. :param video_ids: 需过滤的视频列表 type-list
  428. :return: filtered_video_ids 过滤后的列表 type-list
  429. """
  430. if len(video_ids) == 0:
  431. return video_ids
  432. # 根据Redis缓存中的数据过滤
  433. redis_helper = RedisHelper()
  434. political_key_name = config_.POLITICAL_VIDEOS_KEY_NAME
  435. political_videos_list = redis_helper.get_data_from_set(key_name=political_key_name)
  436. if not political_videos_list:
  437. return video_ids
  438. political_videos = [int(video) for video in political_videos_list]
  439. filtered_video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in political_videos]
  440. return filtered_video_ids
  441. def update_video_w_h_rate(video_ids, key_name):
  442. """
  443. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  444. :param video_ids: videoId列表 type-list
  445. :param key_name: redis key
  446. :return: None
  447. """
  448. # 获取数据
  449. if len(video_ids) == 1:
  450. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
  451. else:
  452. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
  453. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  454. data = mysql_helper.get_data(sql=sql)
  455. # 更新到redis
  456. info_data = {}
  457. for video_id, width, height, rotate in data:
  458. if int(width) == 0 or int(height) == 0:
  459. continue
  460. # rotate 字段值为 90或270时,width和height的值相反
  461. if int(rotate) in (90, 270):
  462. w_h_rate = int(height) / int(width)
  463. else:
  464. w_h_rate = int(width) / int(height)
  465. if w_h_rate > 1:
  466. info_data[int(video_id)] = w_h_rate
  467. redis_helper = RedisHelper()
  468. # 删除旧数据
  469. redis_helper.del_keys(key_name=key_name)
  470. # 写入新数据
  471. if len(info_data) > 0:
  472. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  473. def data_check(project, table, dt):
  474. """检查数据是否准备好"""
  475. odps = ODPS(
  476. access_id=config_.ODPS_CONFIG['ACCESSID'],
  477. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  478. project=project,
  479. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  480. connect_timeout=3000,
  481. read_timeout=500000,
  482. pool_maxsize=1000,
  483. pool_connections=1000
  484. )
  485. try:
  486. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  487. if check_res:
  488. sql = f'select * from {project}.{table} where dt = {dt}'
  489. with odps.execute_sql(sql=sql).open_reader() as reader:
  490. data_count = reader.count
  491. else:
  492. data_count = 0
  493. except Exception as e:
  494. data_count = 0
  495. return data_count
  496. def get_feature_data(project, table, features, dt):
  497. """获取特征数据"""
  498. records = get_data_from_odps(date=dt, project=project, table=table)
  499. feature_data = []
  500. for record in records:
  501. item = {}
  502. for feature_name in features:
  503. item[feature_name] = record[feature_name]
  504. feature_data.append(item)
  505. feature_df = pd.DataFrame(feature_data)
  506. return feature_df
  507. if __name__ == '__main__':
  508. # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
  509. # data_normalization(data_test)
  510. # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
  511. # video_ids = [110, 112, 113, 115, 116, 117, 8289883]
  512. # update_video_w_h_rate(video_ids=video_ids, key_name='')
  513. project = config_.PROJECT_24H_APP_TYPE
  514. table = config_.TABLE_24H_APP_TYPE
  515. dt = '2022080115'
  516. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  517. print(check_res)