utils.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. # coding:utf-8
  2. import pickle
  3. import os
  4. import requests
  5. import json
  6. import traceback
  7. import pandas as pd
  8. from odps import ODPS
  9. from config import set_config
  10. from db_help import MysqlHelper, RedisHelper
  11. #from log import Log
  12. config_ = set_config()
  13. #log_ = Log()
  14. def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
  15. pool_maxsize=1000, pool_connections=1000):
  16. odps = ODPS(
  17. access_id=config_.ODPS_CONFIG['ACCESSID'],
  18. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  19. project=project,
  20. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  21. connect_timeout=connect_timeout,
  22. read_timeout=read_timeout,
  23. pool_maxsize=pool_maxsize,
  24. pool_connections=pool_connections
  25. )
  26. records = odps.execute_sql(sql=sql)
  27. return records
  28. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  29. pool_maxsize=1000, pool_connections=1000):
  30. """
  31. 从odps获取数据
  32. :param date: 日期 type-string '%Y%m%d'
  33. :param project: type-string
  34. :param table: 表名 type-string
  35. :param connect_timeout: 连接超时设置
  36. :param read_timeout: 读取超时设置
  37. :param pool_maxsize:
  38. :param pool_connections:
  39. :return: records
  40. """
  41. odps = ODPS(
  42. access_id=config_.ODPS_CONFIG['ACCESSID'],
  43. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  44. project=project,
  45. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  46. connect_timeout=connect_timeout,
  47. read_timeout=read_timeout,
  48. pool_maxsize=pool_maxsize,
  49. pool_connections=pool_connections
  50. )
  51. records = odps.read_table(name=table, partition='dt=%s' % date)
  52. return records
  53. def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
  54. pool_maxsize=1000, pool_connections=1000):
  55. """
  56. 判断表中是否存在这个分区
  57. :param date: 日期 type-string '%Y%m%d'
  58. :param project: type-string
  59. :param table: 表名 type-string
  60. :param connect_timeout: 连接超时设置
  61. :param read_timeout: 读取超时设置
  62. :param pool_maxsize:
  63. :param pool_connections:
  64. :return: records
  65. """
  66. odps = ODPS(
  67. access_id=config_.ODPS_CONFIG['ACCESSID'],
  68. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  69. project=project,
  70. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  71. connect_timeout=connect_timeout,
  72. read_timeout=read_timeout,
  73. pool_maxsize=pool_maxsize,
  74. pool_connections=pool_connections
  75. )
  76. t = odps.get_table(name=table)
  77. return t.exist_partition(partition_spec=f'dt={date}')
  78. '''def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  79. """
  80. 将数据写入pickle文件中
  81. :param data: 数据
  82. :param filename: 写入的文件名
  83. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  84. :return: None
  85. """
  86. if not os.path.exists(filepath):
  87. os.makedirs(filepath)
  88. file = os.path.join(filepath, filename)
  89. with open(file, 'wb') as wf:
  90. pickle.dump(data, wf)
  91. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  92. """
  93. 从pickle文件读取数据
  94. :param filename: 文件名
  95. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  96. :return: data
  97. """
  98. file = os.path.join(filepath, filename)
  99. if not os.path.exists(file):
  100. return None
  101. with open(file, 'rb') as rf:
  102. data = pickle.load(rf)
  103. return data '''
  104. def send_msg_to_feishu(webhook, key_word, msg_text):
  105. """发送消息到飞书"""
  106. headers = {'Content-Type': 'application/json'}
  107. payload_message = {
  108. "msg_type": "text",
  109. "content": {
  110. "text": '{}: {}'.format(key_word, msg_text)
  111. }
  112. }
  113. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  114. print(response.text)
  115. def send_msg_to_feishu_new(webhook, key_word, title, msg_list):
  116. """发送消息到飞书"""
  117. headers = {'Content-Type': 'application/json'}
  118. content_list = [
  119. [
  120. {
  121. "tag": "text",
  122. "text": msg
  123. }
  124. ]
  125. for msg in msg_list
  126. ]
  127. payload_message = {
  128. "msg_type": "post",
  129. "content": {
  130. "post": {
  131. "zh_cn": {
  132. "title": f"{key_word}: {title}",
  133. "content": content_list,
  134. }
  135. }
  136. }
  137. }
  138. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  139. print(response.text)
  140. def request_post(request_url, request_data=None, **kwargs):
  141. """
  142. post 请求 HTTP接口
  143. :param request_url: 接口URL
  144. :param request_data: 请求参数
  145. :return: res_data json格式
  146. """
  147. try:
  148. response = requests.post(url=request_url, json=request_data, **kwargs)
  149. if response.status_code == 200:
  150. res_data = json.loads(response.text)
  151. return res_data
  152. else:
  153. #log_.info(f"response.status_code: {response.status_code}")
  154. return None
  155. except Exception as e:
  156. #log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  157. send_msg_to_feishu(
  158. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  159. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  160. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  161. )
  162. return None
  163. def request_get(request_url):
  164. """
  165. get 请求 HTTP接口
  166. :param request_url: 接口URL
  167. :return: res_data json格式
  168. """
  169. try:
  170. response = requests.get(url=request_url)
  171. if response.status_code == 200:
  172. res_data = json.loads(response.text)
  173. return res_data
  174. else:
  175. #log_.info(f"response.status_code: {response.status_code}")
  176. return None
  177. except Exception as e:
  178. #log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  179. send_msg_to_feishu(
  180. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  181. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  182. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  183. )
  184. return None
  185. def data_normalization(data):
  186. """
  187. 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
  188. :param data: type-list
  189. :return: normal_data, type-list 归一化后的数据
  190. """
  191. x_max = max(data)
  192. x_min = min(data)
  193. normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
  194. return normal_data
  195. def filter_video_status(video_ids):
  196. """
  197. 对视频状态进行过滤
  198. :param video_ids: 视频id列表 type-list
  199. :return: filtered_videos
  200. """
  201. i = 0
  202. while i < 3:
  203. try:
  204. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  205. video_status_sql = "SELECT t1.id AS 'video_id', " \
  206. "t1.transcode_status AS 'transcoding_status', " \
  207. "t2.audit_status AS 'audit_status', " \
  208. "t2.video_status AS 'open_status', " \
  209. "t2.recommend_status AS 'applet_rec_status', " \
  210. "t2.app_recommend_status AS 'app_rec_status', " \
  211. "t3.charge AS 'payment_status', " \
  212. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  213. "FROM longvideo.wx_video t1 " \
  214. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  215. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  216. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  217. if len(video_ids) == 1:
  218. sql = "SELECT video_id " \
  219. "FROM ({}) " \
  220. "WHERE audit_status = 5 " \
  221. "AND applet_rec_status IN (1, -6) " \
  222. "AND open_status = 1 " \
  223. "AND payment_status = 0 " \
  224. "AND encryption_status != 5 " \
  225. "AND transcoding_status = 3 " \
  226. "AND video_id IN ({});".format(video_status_sql, video_ids[0])
  227. data = mysql_helper.get_data(sql=sql)
  228. else:
  229. data = []
  230. for i in range(len(video_ids) // 200 + 1):
  231. sql = "SELECT video_id " \
  232. "FROM ({}) " \
  233. "WHERE audit_status = 5 " \
  234. "AND applet_rec_status IN (1, -6) " \
  235. "AND open_status = 1 " \
  236. "AND payment_status = 0 " \
  237. "AND encryption_status != 5 " \
  238. "AND transcoding_status = 3 " \
  239. "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
  240. select_res = mysql_helper.get_data(sql=sql)
  241. if select_res is not None:
  242. data += select_res
  243. filtered_videos = [int(temp[0]) for temp in data]
  244. return filtered_videos
  245. except Exception as e:
  246. #log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  247. send_msg_to_feishu(
  248. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  249. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  250. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  251. f"retry count: {i}\n"
  252. f"exception: {e}\n"
  253. f"traceback: {traceback.format_exc()}"
  254. )
  255. i += 1
  256. if i == 1:
  257. return video_ids
  258. def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
  259. """
  260. 对视频状态进行过滤
  261. :param video_ids: 视频id列表 type-list
  262. :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
  263. :return: filtered_videos
  264. """
  265. i = 0
  266. while i < 3:
  267. try:
  268. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  269. video_status_sql = "SELECT t1.id AS 'video_id', " \
  270. "t1.transcode_status AS 'transcoding_status', " \
  271. "t2.audit_status AS 'audit_status', " \
  272. "t2.video_status AS 'open_status', " \
  273. "t2.recommend_status AS 'applet_rec_status', " \
  274. "t2.app_recommend_status AS 'app_rec_status', " \
  275. "t3.charge AS 'payment_status', " \
  276. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  277. "FROM longvideo.wx_video t1 " \
  278. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  279. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  280. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  281. if len(video_ids) == 1:
  282. sql = "SELECT video_id " \
  283. "FROM ({}) " \
  284. "WHERE audit_status = 5 " \
  285. "AND applet_rec_status = {} " \
  286. "AND open_status = 1 " \
  287. "AND payment_status = 0 " \
  288. "AND encryption_status != 5 " \
  289. "AND transcoding_status = 3 " \
  290. "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
  291. data = mysql_helper.get_data(sql=sql)
  292. else:
  293. data = []
  294. for i in range(len(video_ids) // 200 + 1):
  295. sql = "SELECT video_id " \
  296. "FROM ({}) " \
  297. "WHERE audit_status = 5 " \
  298. "AND applet_rec_status = {} " \
  299. "AND open_status = 1 " \
  300. "AND payment_status = 0 " \
  301. "AND encryption_status != 5 " \
  302. "AND transcoding_status = 3 " \
  303. "AND video_id IN {};".format(video_status_sql, applet_rec_status,
  304. tuple(video_ids[i*200:(i+1)*200]))
  305. select_res = mysql_helper.get_data(sql=sql)
  306. if select_res is not None:
  307. data += select_res
  308. filtered_videos = [int(temp[0]) for temp in data]
  309. return filtered_videos
  310. except Exception as e:
  311. #log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  312. send_msg_to_feishu(
  313. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  314. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  315. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  316. f"retry count: {i}\n"
  317. f"exception: {e}\n"
  318. f"traceback: {traceback.format_exc()}"
  319. )
  320. i += 1
  321. if i == 1:
  322. return video_ids
  323. def filter_video_status_app(video_ids):
  324. """
  325. 对视频状态进行过滤 - app
  326. :param video_ids: 视频id列表 type-list
  327. :return: filtered_videos
  328. """
  329. i = 0
  330. while i < 3:
  331. try:
  332. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  333. video_status_sql = "SELECT t1.id AS 'video_id', " \
  334. "t1.transcode_status AS 'transcoding_status', " \
  335. "t2.app_audit_status AS 'app_audit_status', " \
  336. "t2.original_status AS 'open_status', " \
  337. "t2.recommend_status AS 'applet_rec_status', " \
  338. "t2.app_recommend_status AS 'app_rec_status', " \
  339. "t3.charge AS 'payment_status', " \
  340. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  341. "FROM longvideo.wx_video t1 " \
  342. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  343. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  344. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  345. if len(video_ids) == 1:
  346. sql = "SELECT video_id " \
  347. "FROM ({}) " \
  348. "WHERE app_audit_status = 5 " \
  349. "AND app_rec_status IN (1, -6, 10) " \
  350. "AND open_status = 1 " \
  351. "AND payment_status = 0 " \
  352. "AND encryption_status != 5 " \
  353. "AND transcoding_status = 3 " \
  354. "AND video_id IN ({});".format(video_status_sql, video_ids[0])
  355. data = mysql_helper.get_data(sql=sql)
  356. else:
  357. data = []
  358. for i in range(len(video_ids) // 200 + 1):
  359. sql = "SELECT video_id " \
  360. "FROM ({}) " \
  361. "WHERE app_audit_status = 5 " \
  362. "AND app_rec_status IN (1, -6, 10) " \
  363. "AND open_status = 1 " \
  364. "AND payment_status = 0 " \
  365. "AND encryption_status != 5 " \
  366. "AND transcoding_status = 3 " \
  367. "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
  368. select_res = mysql_helper.get_data(sql=sql)
  369. if select_res is not None:
  370. data += select_res
  371. filtered_videos = [int(temp[0]) for temp in data]
  372. return filtered_videos
  373. except Exception as e:
  374. #log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  375. send_msg_to_feishu(
  376. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  377. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  378. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  379. f"retry count: {i}\n"
  380. f"exception: {e}\n"
  381. f"traceback: {traceback.format_exc()}"
  382. )
  383. i += 1
  384. if i == 1:
  385. return video_ids
  386. def filter_shield_video(video_ids, shield_key_name_list):
  387. """
  388. 过滤屏蔽视频视频
  389. :param video_ids: 需过滤的视频列表 type-list
  390. :param shield_key_name_list: 过滤视频 redis-key
  391. :return: filtered_videos 过滤后的列表 type-list
  392. """
  393. if len(video_ids) == 0:
  394. return video_ids
  395. # 根据Redis缓存中的数据过滤
  396. redis_helper = RedisHelper()
  397. for shield_key_name in shield_key_name_list:
  398. shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  399. if not shield_videos_list:
  400. continue
  401. shield_videos = [int(video) for video in shield_videos_list]
  402. video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  403. return video_ids
  404. def filter_political_videos(video_ids):
  405. """
  406. 过滤涉政视频
  407. :param video_ids: 需过滤的视频列表 type-list
  408. :return: filtered_video_ids 过滤后的列表 type-list
  409. """
  410. if len(video_ids) == 0:
  411. return video_ids
  412. # 根据Redis缓存中的数据过滤
  413. redis_helper = RedisHelper()
  414. political_key_name = config_.POLITICAL_VIDEOS_KEY_NAME
  415. political_videos_list = redis_helper.get_data_from_set(key_name=political_key_name)
  416. if not political_videos_list:
  417. return video_ids
  418. political_videos = [int(video) for video in political_videos_list]
  419. filtered_video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in political_videos]
  420. return filtered_video_ids
  421. def update_video_w_h_rate(video_ids, key_name):
  422. """
  423. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  424. :param video_ids: videoId列表 type-list
  425. :param key_name: redis key
  426. :return: None
  427. """
  428. # 获取数据
  429. if len(video_ids) == 1:
  430. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
  431. else:
  432. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
  433. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  434. data = mysql_helper.get_data(sql=sql)
  435. # 更新到redis
  436. info_data = {}
  437. for video_id, width, height, rotate in data:
  438. if int(width) == 0 or int(height) == 0:
  439. continue
  440. # rotate 字段值为 90或270时,width和height的值相反
  441. if int(rotate) in (90, 270):
  442. w_h_rate = int(height) / int(width)
  443. else:
  444. w_h_rate = int(width) / int(height)
  445. if w_h_rate > 1:
  446. info_data[int(video_id)] = w_h_rate
  447. redis_helper = RedisHelper()
  448. # 删除旧数据
  449. redis_helper.del_keys(key_name=key_name)
  450. # 写入新数据
  451. if len(info_data) > 0:
  452. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  453. def data_check(project, table, dt):
  454. """检查数据是否准备好"""
  455. odps = ODPS(
  456. access_id=config_.ODPS_CONFIG['ACCESSID'],
  457. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  458. project=project,
  459. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  460. connect_timeout=3000,
  461. read_timeout=500000,
  462. pool_maxsize=1000,
  463. pool_connections=1000
  464. )
  465. try:
  466. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  467. if check_res:
  468. sql = f'select * from {project}.{table} where dt = {dt}'
  469. with odps.execute_sql(sql=sql).open_reader() as reader:
  470. data_count = reader.count
  471. else:
  472. data_count = 0
  473. except Exception as e:
  474. data_count = 0
  475. return data_count
  476. def get_feature_data(project, table, features, dt):
  477. """获取特征数据"""
  478. records = get_data_from_odps(date=dt, project=project, table=table)
  479. feature_data = []
  480. for record in records:
  481. item = {}
  482. for feature_name in features:
  483. item[feature_name] = record[feature_name]
  484. feature_data.append(item)
  485. feature_df = pd.DataFrame(feature_data)
  486. return feature_df
  487. if __name__ == '__main__':
  488. # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
  489. # data_normalization(data_test)
  490. # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
  491. # video_ids = [110, 112, 113, 115, 116, 117, 8289883]
  492. # update_video_w_h_rate(video_ids=video_ids, key_name='')
  493. project = config_.PROJECT_24H_APP_TYPE
  494. table = config_.TABLE_24H_APP_TYPE
  495. dt = '2022080115'
  496. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  497. print(check_res)