my_utils.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. # coding:utf-8
  2. import pickle
  3. import os
  4. import requests
  5. import json
  6. import traceback
  7. import pandas as pd
  8. from odps import ODPS
  9. from odps.df import DataFrame
  10. from my_config import set_config
  11. from db_helper import HologresHelper, MysqlHelper, RedisHelper
  12. from log import Log
  13. from collections import defaultdict
  14. config_, env = set_config()
  15. log_ = Log()
  16. def get_odps_instance(project):
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. )
  23. return odps
  24. def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
  25. pool_maxsize=1000, pool_connections=1000):
  26. odps = ODPS(
  27. access_id=config_.ODPS_CONFIG['ACCESSID'],
  28. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  29. project=project,
  30. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  31. connect_timeout=connect_timeout,
  32. read_timeout=read_timeout,
  33. pool_maxsize=pool_maxsize,
  34. pool_connections=pool_connections
  35. )
  36. records = odps.execute_sql(sql=sql)
  37. return records
  38. def exe_sql(project, sql, connect_timeout=3000, read_timeout=500000,
  39. pool_maxsize=1000, pool_connections=1000):
  40. odps = ODPS(
  41. access_id=config_.ODPS_CONFIG['ACCESSID'],
  42. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  43. project=project,
  44. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  45. connect_timeout=connect_timeout,
  46. read_timeout=read_timeout,
  47. pool_maxsize=pool_maxsize,
  48. pool_connections=pool_connections
  49. )
  50. records = odps.execute_sql(sql)
  51. return records
  52. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  53. pool_maxsize=1000, pool_connections=1000):
  54. """
  55. 从odps获取数据
  56. :param date: 日期 type-string '%Y%m%d'
  57. :param project: type-string
  58. :param table: 表名 type-string
  59. :param connect_timeout: 连接超时设置
  60. :param read_timeout: 读取超时设置
  61. :param pool_maxsize:
  62. :param pool_connections:
  63. :return: records
  64. """
  65. odps = ODPS(
  66. access_id=config_.ODPS_CONFIG['ACCESSID'],
  67. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  68. project=project,
  69. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  70. connect_timeout=connect_timeout,
  71. read_timeout=read_timeout,
  72. pool_maxsize=pool_maxsize,
  73. pool_connections=pool_connections
  74. )
  75. records = odps.read_table(name=table, partition='dt=%s' % date)
  76. return records
  77. def get_dataframe_from_odps(project, table, partition_spec_dict=None):
  78. """
  79. 从odps获取数据
  80. :param partition_spec_dict: 分区spec type-dict
  81. :param project: type-string
  82. :param table: 表名 type-string
  83. :return: odps.DataFrame
  84. """
  85. odps = get_odps_instance(project)
  86. if partition_spec_dict:
  87. spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
  88. partition_spec_dict.keys()])
  89. return DataFrame(odps.get_table(name=table)).filter_parts(spec)
  90. else:
  91. return DataFrame(odps.get_table(name=table))
  92. def get_odps_df_of_max_partition(project, table, rb_spec=None):
  93. """
  94. rb_spec: spec for right bound of partition names. type-dict
  95. return odps.DataFrame
  96. """
  97. odps = get_odps_instance(project)
  98. t = odps.get_table(table)
  99. df = DataFrame(odps.get_table(table))
  100. if rb_spec is None:
  101. return df.filter_parts(t.get_max_partition().partition_spec)
  102. else:
  103. spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
  104. part_iter = t.iterate_partitions(spec=spec, reverse=True)
  105. try:
  106. partition = next(part_iter)
  107. return df.filter_parts(partition)
  108. except StopIteration:
  109. return None
  110. def get_odps_df_of_recent_partitions(project, table, n=1, rb_spec=None):
  111. """
  112. rb_spec: spec for right bound of partition names. type-dict
  113. return odps.DataFrame
  114. """
  115. odps = get_odps_instance(project)
  116. t = odps.get_table(table)
  117. df = DataFrame(odps.get_table(table))
  118. spec = None
  119. if rb_spec:
  120. spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
  121. part_iter = t.iterate_partitions(spec=spec, reverse=True)
  122. selected_parts = []
  123. try:
  124. for i in range(0, n):
  125. partition = next(part_iter)
  126. selected_parts.append(partition)
  127. log_.info(f"table: {table}, selected part: {partition.name}")
  128. except StopIteration:
  129. log_.info(f"table: {table}, no more parts to iterate")
  130. return df.filter_parts(selected_parts)
  131. def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
  132. pool_maxsize=1000, pool_connections=1000):
  133. """
  134. 判断表中是否存在这个分区
  135. :param date: 日期 type-string '%Y%m%d'
  136. :param project: type-string
  137. :param table: 表名 type-string
  138. :param connect_timeout: 连接超时设置
  139. :param read_timeout: 读取超时设置
  140. :param pool_maxsize:
  141. :param pool_connections:
  142. :return: records
  143. """
  144. odps = ODPS(
  145. access_id=config_.ODPS_CONFIG['ACCESSID'],
  146. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  147. project=project,
  148. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  149. connect_timeout=connect_timeout,
  150. read_timeout=read_timeout,
  151. pool_maxsize=pool_maxsize,
  152. pool_connections=pool_connections
  153. )
  154. t = odps.get_table(name=table)
  155. return t.exist_partition(partition_spec=f'dt={date}')
  156. def check_table_partition_exits_v2(project, table, partition_spec_dict):
  157. """
  158. 判断表中是否存在指定分区,并返回分区纪录数量
  159. 注:ODPS新版本移除了timeout等参数
  160. :param project: 库名 type-string
  161. :param table: 表名 type-string
  162. :param partition_spec_dict: 分区spec type-dict
  163. :return: if_exist, num_records
  164. """
  165. odps = get_odps_instance(project)
  166. t = odps.get_table(name=table)
  167. spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
  168. partition_spec_dict.keys()])
  169. if t.exist_partition(partition_spec=spec):
  170. with t.open_reader(partition=spec) as reader:
  171. count = reader.count
  172. return True, count
  173. else:
  174. return False, 0
  175. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  176. """
  177. 将数据写入pickle文件中
  178. :param data: 数据
  179. :param filename: 写入的文件名
  180. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  181. :return: None
  182. """
  183. if not os.path.exists(filepath):
  184. os.makedirs(filepath)
  185. file = os.path.join(filepath, filename)
  186. with open(file, 'wb') as wf:
  187. pickle.dump(data, wf)
  188. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  189. """
  190. 从pickle文件读取数据
  191. :param filename: 文件名
  192. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  193. :return: data
  194. """
  195. file = os.path.join(filepath, filename)
  196. if not os.path.exists(file):
  197. return None
  198. with open(file, 'rb') as rf:
  199. data = pickle.load(rf)
  200. return data
  201. def send_msg_to_feishu(webhook, key_word, msg_text):
  202. """发送消息到飞书"""
  203. headers = {'Content-Type': 'application/json'}
  204. payload_message = {
  205. "msg_type": "text",
  206. "content": {
  207. "text": '{}: {}'.format(key_word, msg_text)
  208. }
  209. }
  210. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  211. print(response.text)
  212. def send_msg_to_feishu_new(webhook, key_word, title, msg_list):
  213. """发送消息到飞书"""
  214. headers = {'Content-Type': 'application/json'}
  215. content_list = [
  216. [
  217. {
  218. "tag": "text",
  219. "text": msg
  220. }
  221. ]
  222. for msg in msg_list
  223. ]
  224. payload_message = {
  225. "msg_type": "post",
  226. "content": {
  227. "post": {
  228. "zh_cn": {
  229. "title": f"{key_word}: {title}",
  230. "content": content_list,
  231. }
  232. }
  233. }
  234. }
  235. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  236. print(response.text)
  237. def request_post(request_url, request_data=None, **kwargs):
  238. """
  239. post 请求 HTTP接口
  240. :param request_url: 接口URL
  241. :param request_data: 请求参数
  242. :return: res_data json格式
  243. """
  244. try:
  245. response = requests.post(url=request_url, json=request_data, **kwargs)
  246. if response.status_code == 200:
  247. res_data = json.loads(response.text)
  248. return res_data
  249. else:
  250. log_.info(f"response.status_code: {response.status_code}")
  251. return None
  252. except Exception as e:
  253. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  254. send_msg_to_feishu(
  255. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  256. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  257. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  258. )
  259. return None
  260. def request_get(request_url):
  261. """
  262. get 请求 HTTP接口
  263. :param request_url: 接口URL
  264. :return: res_data json格式
  265. """
  266. try:
  267. response = requests.get(url=request_url)
  268. if response.status_code == 200:
  269. res_data = json.loads(response.text)
  270. return res_data
  271. else:
  272. log_.info(f"response.status_code: {response.status_code}")
  273. return None
  274. except Exception as e:
  275. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  276. send_msg_to_feishu(
  277. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  278. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  279. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  280. )
  281. return None
  282. def data_normalization(data):
  283. """
  284. 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
  285. :param data: type-list
  286. :return: normal_data, type-list 归一化后的数据
  287. """
  288. x_max = max(data)
  289. x_min = min(data)
  290. normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
  291. return normal_data
  292. def filter_video_status(video_ids):
  293. """
  294. 对视频状态进行过滤
  295. :param video_ids: 视频id列表 type-list
  296. :return: filtered_videos
  297. """
  298. i = 0
  299. while i < 3:
  300. try:
  301. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  302. video_status_sql = "SELECT t1.id AS 'video_id', " \
  303. "t1.transcode_status AS 'transcoding_status', " \
  304. "t2.audit_status AS 'audit_status', " \
  305. "t2.video_status AS 'open_status', " \
  306. "t2.recommend_status AS 'applet_rec_status', " \
  307. "t2.app_recommend_status AS 'app_rec_status', " \
  308. "t3.charge AS 'payment_status', " \
  309. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  310. "FROM longvideo.wx_video t1 " \
  311. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  312. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  313. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  314. if len(video_ids) == 1:
  315. sql = "SELECT video_id " \
  316. "FROM ({}) " \
  317. "WHERE audit_status = 5 " \
  318. "AND applet_rec_status IN (1, -6) " \
  319. "AND open_status = 1 " \
  320. "AND payment_status = 0 " \
  321. "AND encryption_status != 5 " \
  322. "AND transcoding_status = 3 " \
  323. "AND video_id IN ({});".format(video_status_sql, video_ids[0])
  324. data = mysql_helper.get_data(sql=sql)
  325. else:
  326. data = []
  327. for i in range(len(video_ids) // 200 + 1):
  328. sql = "SELECT video_id " \
  329. "FROM ({}) " \
  330. "WHERE audit_status = 5 " \
  331. "AND applet_rec_status IN (1, -6) " \
  332. "AND open_status = 1 " \
  333. "AND payment_status = 0 " \
  334. "AND encryption_status != 5 " \
  335. "AND transcoding_status = 3 " \
  336. "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
  337. select_res = mysql_helper.get_data(sql=sql)
  338. if select_res is not None:
  339. data += select_res
  340. filtered_videos = [int(temp[0]) for temp in data]
  341. return filtered_videos
  342. except Exception as e:
  343. log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  344. send_msg_to_feishu(
  345. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  346. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  347. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  348. f"retry count: {i}\n"
  349. f"exception: {e}\n"
  350. f"traceback: {traceback.format_exc()}"
  351. )
  352. i += 1
  353. if i == 1:
  354. return video_ids
  355. def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
  356. """
  357. 对视频状态进行过滤
  358. :param video_ids: 视频id列表 type-list
  359. :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
  360. :return: filtered_videos
  361. """
  362. i = 0
  363. while i < 3:
  364. try:
  365. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  366. video_status_sql = "SELECT t1.id AS 'video_id', " \
  367. "t1.transcode_status AS 'transcoding_status', " \
  368. "t2.audit_status AS 'audit_status', " \
  369. "t2.video_status AS 'open_status', " \
  370. "t2.recommend_status AS 'applet_rec_status', " \
  371. "t2.app_recommend_status AS 'app_rec_status', " \
  372. "t3.charge AS 'payment_status', " \
  373. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  374. "FROM longvideo.wx_video t1 " \
  375. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  376. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  377. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  378. if len(video_ids) == 1:
  379. sql = "SELECT video_id " \
  380. "FROM ({}) " \
  381. "WHERE audit_status = 5 " \
  382. "AND applet_rec_status = {} " \
  383. "AND open_status = 1 " \
  384. "AND payment_status = 0 " \
  385. "AND encryption_status != 5 " \
  386. "AND transcoding_status = 3 " \
  387. "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
  388. data = mysql_helper.get_data(sql=sql)
  389. else:
  390. data = []
  391. for i in range(len(video_ids) // 200 + 1):
  392. sql = "SELECT video_id " \
  393. "FROM ({}) " \
  394. "WHERE audit_status = 5 " \
  395. "AND applet_rec_status = {} " \
  396. "AND open_status = 1 " \
  397. "AND payment_status = 0 " \
  398. "AND encryption_status != 5 " \
  399. "AND transcoding_status = 3 " \
  400. "AND video_id IN {};".format(video_status_sql, applet_rec_status,
  401. tuple(video_ids[i*200:(i+1)*200]))
  402. select_res = mysql_helper.get_data(sql=sql)
  403. if select_res is not None:
  404. data += select_res
  405. filtered_videos = [int(temp[0]) for temp in data]
  406. return filtered_videos
  407. except Exception as e:
  408. log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  409. send_msg_to_feishu(
  410. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  411. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  412. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  413. f"retry count: {i}\n"
  414. f"exception: {e}\n"
  415. f"traceback: {traceback.format_exc()}"
  416. )
  417. i += 1
  418. if i == 1:
  419. return video_ids
  420. def filter_video_status_app(video_ids):
  421. """
  422. 对视频状态进行过滤 - app
  423. :param video_ids: 视频id列表 type-list
  424. :return: filtered_videos
  425. """
  426. i = 0
  427. while i < 3:
  428. try:
  429. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  430. video_status_sql = "SELECT t1.id AS 'video_id', " \
  431. "t1.transcode_status AS 'transcoding_status', " \
  432. "t2.app_audit_status AS 'app_audit_status', " \
  433. "t2.original_status AS 'open_status', " \
  434. "t2.recommend_status AS 'applet_rec_status', " \
  435. "t2.app_recommend_status AS 'app_rec_status', " \
  436. "t3.charge AS 'payment_status', " \
  437. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  438. "FROM longvideo.wx_video t1 " \
  439. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  440. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  441. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  442. if len(video_ids) == 1:
  443. sql = "SELECT video_id " \
  444. "FROM ({}) " \
  445. "WHERE app_audit_status = 5 " \
  446. "AND app_rec_status IN (1, -6, 10) " \
  447. "AND open_status = 1 " \
  448. "AND payment_status = 0 " \
  449. "AND encryption_status != 5 " \
  450. "AND transcoding_status = 3 " \
  451. "AND video_id IN ({});".format(video_status_sql, video_ids[0])
  452. data = mysql_helper.get_data(sql=sql)
  453. else:
  454. data = []
  455. for i in range(len(video_ids) // 200 + 1):
  456. sql = "SELECT video_id " \
  457. "FROM ({}) " \
  458. "WHERE app_audit_status = 5 " \
  459. "AND app_rec_status IN (1, -6, 10) " \
  460. "AND open_status = 1 " \
  461. "AND payment_status = 0 " \
  462. "AND encryption_status != 5 " \
  463. "AND transcoding_status = 3 " \
  464. "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
  465. select_res = mysql_helper.get_data(sql=sql)
  466. if select_res is not None:
  467. data += select_res
  468. filtered_videos = [int(temp[0]) for temp in data]
  469. return filtered_videos
  470. except Exception as e:
  471. log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
  472. send_msg_to_feishu(
  473. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  474. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  475. msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
  476. f"retry count: {i}\n"
  477. f"exception: {e}\n"
  478. f"traceback: {traceback.format_exc()}"
  479. )
  480. i += 1
  481. if i == 1:
  482. return video_ids
  483. def filter_shield_video(video_ids, shield_key_name_list):
  484. """
  485. 过滤屏蔽视频视频
  486. :param video_ids: 需过滤的视频列表 type-list
  487. :param shield_key_name_list: 过滤视频 redis-key
  488. :return: filtered_videos 过滤后的列表 type-list
  489. """
  490. if len(video_ids) == 0:
  491. return video_ids
  492. # 根据Redis缓存中的数据过滤
  493. redis_helper = RedisHelper()
  494. for shield_key_name in shield_key_name_list:
  495. shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  496. if not shield_videos_list:
  497. continue
  498. shield_videos = [int(video) for video in shield_videos_list]
  499. video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  500. return video_ids
  501. def filter_political_videos(video_ids):
  502. """
  503. 过滤涉政视频
  504. :param video_ids: 需过滤的视频列表 type-list
  505. :return: filtered_video_ids 过滤后的列表 type-list
  506. """
  507. if len(video_ids) == 0:
  508. return video_ids
  509. # 根据Redis缓存中的数据过滤
  510. redis_helper = RedisHelper()
  511. political_key_name = config_.POLITICAL_VIDEOS_KEY_NAME
  512. political_videos_list = redis_helper.get_data_from_set(key_name=political_key_name)
  513. if not political_videos_list:
  514. return video_ids
  515. political_videos = [int(video) for video in political_videos_list]
  516. filtered_video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in political_videos]
  517. return filtered_video_ids
  518. def update_video_w_h_rate(video_ids, key_name):
  519. """
  520. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  521. :param video_ids: videoId列表 type-list
  522. :param key_name: redis key
  523. :return: None
  524. """
  525. # 获取数据
  526. if len(video_ids) == 1:
  527. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
  528. else:
  529. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
  530. mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
  531. data = mysql_helper.get_data(sql=sql)
  532. # 更新到redis
  533. info_data = {}
  534. for video_id, width, height, rotate in data:
  535. if int(width) == 0 or int(height) == 0:
  536. continue
  537. # rotate 字段值为 90或270时,width和height的值相反
  538. if int(rotate) in (90, 270):
  539. w_h_rate = int(height) / int(width)
  540. else:
  541. w_h_rate = int(width) / int(height)
  542. if w_h_rate > 1:
  543. info_data[int(video_id)] = w_h_rate
  544. redis_helper = RedisHelper()
  545. # 删除旧数据
  546. redis_helper.del_keys(key_name=key_name)
  547. # 写入新数据
  548. if len(info_data) > 0:
  549. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  550. def data_check(project, table, dt):
  551. """检查数据是否准备好"""
  552. odps = ODPS(
  553. access_id=config_.ODPS_CONFIG['ACCESSID'],
  554. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  555. project=project,
  556. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  557. connect_timeout=3000,
  558. read_timeout=500000,
  559. pool_maxsize=1000,
  560. pool_connections=1000
  561. )
  562. try:
  563. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  564. if check_res:
  565. sql = f'select * from {project}.{table} where dt = {dt}'
  566. with odps.execute_sql(sql=sql).open_reader() as reader:
  567. data_count = reader.count
  568. else:
  569. data_count = 0
  570. except Exception as e:
  571. data_count = 0
  572. return data_count
  573. def get_feature_data(project, table, features, dt):
  574. """获取特征数据"""
  575. records = get_data_from_odps(date=dt, project=project, table=table)
  576. feature_data = []
  577. for record in records:
  578. item = {}
  579. for feature_name in features:
  580. item[feature_name] = record[feature_name]
  581. feature_data.append(item)
  582. feature_df = pd.DataFrame(feature_data)
  583. return feature_df
  584. if __name__ == '__main__':
  585. # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
  586. # data_normalization(data_test)
  587. # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
  588. # video_ids = [110, 112, 113, 115, 116, 117, 8289883]
  589. # update_video_w_h_rate(video_ids=video_ids, key_name='')
  590. project = config_.PROJECT_24H_APP_TYPE
  591. table = config_.TABLE_24H_APP_TYPE
  592. dt = '2022080115'
  593. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  594. print(check_res)