utils.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002
  1. import traceback
  2. import requests
  3. import json
  4. import time
  5. import gevent
  6. import pandas as pd
  7. import random
  8. from datetime import date, timedelta, datetime
  9. from typing import List
  10. # from db_helper import HologresHelper, RedisHelper, MysqlHelper
  11. from db_helper import RedisHelper, MysqlHelper
  12. from config import set_config
  13. from log import Log
  14. from parameter_update import param_update_risk_rule
  15. from parameter_update import param_update_risk_videos
  16. from parameter_update import param_update_risk_filter_flag
  17. config_ = set_config()
  18. log_ = Log()
  19. FESTIVAL = [
  20. ["除夕", 2024020900, 2024030100],
  21. ["春节", 2024020900, 2024021800],
  22. ["初一", 2024021010, 2024021800],
  23. ["初二", 2024021110, 2024021800],
  24. ["初三", 2024021210, 2024021800],
  25. ["初四", 2024021310, 2024021800],
  26. ["初五", 2024021410, 2024021800],
  27. ["情人节", 2024021410, 2024021800],
  28. ["初六", 2024021410, 2024021800],
  29. ["初七", 2024021410, 2024021800],
  30. ["初八", 2024021410, 2024021800],
  31. ["雨水", 2024021909, 2024022000],
  32. ["妇女节", 2024030808, 2024031200],
  33. ["龙抬头", 2024031109, 2024031400]
  34. ]
  35. def send_msg_to_feishu(msg_text):
  36. """发送消息到飞书"""
  37. # webhook地址
  38. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  39. # 自定义关键词key_word
  40. key_word = '服务报警'
  41. headers = {'Content-Type': 'application/json'}
  42. payload_message = {
  43. "msg_type": "text",
  44. "content": {
  45. "text": '{}: {}'.format(key_word, msg_text)
  46. }
  47. }
  48. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  49. # print(response.text)
  50. def request_post(request_url, request_data, timeout):
  51. """
  52. post 请求 HTTP接口
  53. :param request_url: 接口URL
  54. :param request_data: 请求参数
  55. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  56. :return: res_data json格式
  57. """
  58. try:
  59. headers = {"Connection": "close"}
  60. #print(request_url)
  61. #print(headers)
  62. response = requests.post(url=request_url, json=request_data, timeout=timeout, headers=headers)
  63. #print("response:", response)
  64. if response.status_code == 200:
  65. res_data = json.loads(response.text)
  66. return res_data
  67. else:
  68. return None
  69. except Exception as e:
  70. #print(e)
  71. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  72. return None
  73. def request_post_data(request_url, request_data, timeout):
  74. """
  75. post 请求 HTTP接口
  76. :param request_url: 接口URL
  77. :param request_data: 请求参数
  78. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  79. :return: res_data json格式
  80. """
  81. try:
  82. headers = {'content-type': 'application/json'}
  83. response = requests.post(url=request_url, data=request_data, timeout=timeout, headers=headers)
  84. #print("response:", response)
  85. if response.status_code == 200:
  86. res_data = json.loads(response.text)
  87. return res_data['outputs']
  88. else:
  89. return None
  90. except Exception as e:
  91. #print(e)
  92. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  93. return None
  94. def request_get(request_url, timeout):
  95. """
  96. get 请求 HTTP接口
  97. :param request_url: 接口URL
  98. :param timeout: 超时时间,单位为秒,type-float or tuple(connect_timeout, read_timeout)
  99. :return: res_data json格式
  100. """
  101. try:
  102. response = requests.get(url=request_url, timeout=timeout)
  103. if response.status_code == 200:
  104. res_data = json.loads(response.text)
  105. return res_data
  106. else:
  107. return None
  108. except Exception as e:
  109. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  110. return None
  111. def get_user_has30day_return(mid):
  112. """
  113. 获取用户近30天是否有回流
  114. :param mid: mid
  115. :return: data, type
  116. """
  117. if not mid:
  118. return None
  119. # 获取redis中存储的状态值
  120. user_key = f"{config_.KEY_NAME_PREFIX_USER_HAS30DAY_RETURN}{mid}"
  121. redis_helper = RedisHelper()
  122. data = redis_helper.get_data_from_redis(key_name=user_key)
  123. if data is not None:
  124. return int(data)
  125. else:
  126. request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  127. result = request_get(request_url=request_url, timeout=0.1)
  128. if result is None:
  129. return None
  130. if result['code'] != 0:
  131. return None
  132. data = result['data']
  133. if data is True:
  134. redis_data = 1
  135. else:
  136. redis_data = 0
  137. redis_helper.set_data_to_redis(key_name=user_key, value=redis_data, expire_time=2 * 3600)
  138. return redis_data
  139. def get_videos_remain_view_count(app_type, videos):
  140. """
  141. 获取视频在流量池中的剩余可分发数
  142. :param app_type: 产品标识 type-int
  143. :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
  144. :return: data type-list,[(video_id, flow_pool, view_count), ...]
  145. error_flag 错误标记,True为错误
  146. """
  147. error_flag = False
  148. if not videos:
  149. return [], error_flag
  150. request_data = {'appType': app_type, 'videos': videos}
  151. result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=(0.1, 1))
  152. if result is None:
  153. error_flag = True
  154. return [], error_flag
  155. if result['code'] != 0:
  156. log_.info('获取视频在流量池中的剩余可分发数失败')
  157. error_flag = True
  158. return [], error_flag
  159. data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
  160. return data, error_flag
  161. def get_videos_local_distribute_count(video_id, flow_pool):
  162. """
  163. 获取流量池视频本地分发数
  164. :param video_id: video_id
  165. :param flow_pool: 流量池标记
  166. :return: current_count 本地记录的分发数
  167. """
  168. # redis_h = datetime.now().hour
  169. # if datetime.now().minute >= 30:
  170. # redis_h += 0.5
  171. # key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  172. key_name = f'{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}'
  173. redis_helper = RedisHelper()
  174. # video = '{}-{}'.format(video_id, flow_pool)
  175. # current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
  176. current_count = redis_helper.get_data_from_redis(key_name=key_name)
  177. if current_count is not None:
  178. return int(current_count)
  179. else:
  180. return None
  181. def update_video_w_h_rate(video_id, key_name):
  182. """
  183. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  184. :param video_id: videoId type-int
  185. :param key_name: redis key
  186. :return: None
  187. """
  188. # 获取数据
  189. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_id)
  190. mysql_helper = MysqlHelper()
  191. data = mysql_helper.get_data(sql=sql)
  192. if len(data) == 0:
  193. return
  194. # 更新到redis
  195. width, height, rotate = int(data[0][1]), int(data[0][2]), int(data[0][3])
  196. if width == 0 or height == 0:
  197. return
  198. if rotate in (90, 270):
  199. w_h_rate = height / width
  200. else:
  201. w_h_rate = width / height
  202. if w_h_rate > 1:
  203. info_data = {int(video_id): w_h_rate}
  204. else:
  205. return
  206. redis_helper = RedisHelper()
  207. # 写入新数据
  208. if len(info_data) > 0:
  209. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  210. class FilterVideos(object):
  211. """视频过滤"""
  212. def __init__(self, request_id, app_type, video_ids, mid='', uid='',
  213. expansion_factor=None,
  214. risk_filter_flag=None,
  215. app_region_filtered=None,
  216. videos_with_risk=None,
  217. force_truncation=None,
  218. env_dict=None
  219. ):
  220. """
  221. 初始化
  222. :param request_id: request_id
  223. :param app_type: 产品标识 type-int
  224. :param video_ids: 需过滤的视频列表 type-list
  225. :param mid: mid type-string
  226. :param uid: uid type-string
  227. """
  228. self.request_id = request_id
  229. self.app_type = app_type
  230. self.mid = mid
  231. self.uid = uid
  232. self.video_ids = video_ids
  233. self.expansion_factor = expansion_factor
  234. self.risk_filter_flag = risk_filter_flag
  235. self.app_region_filtered = app_region_filtered
  236. self.videos_with_risk = videos_with_risk
  237. self.force_truncation = force_truncation
  238. self.env_dict = env_dict
  239. def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
  240. """召回小时级更新的视频状态过滤"""
  241. # 根据Redis缓存中的数据过滤
  242. redis_helper = RedisHelper()
  243. # 获取不符合推荐状态的视频
  244. if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  245. if key_flag == 'region_24h':
  246. key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
  247. elif key_flag == 'day_24h':
  248. key_prefix = f"{config_.H_VIDEO_FILER_24H}{province_code}."
  249. else:
  250. key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
  251. elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  252. key_prefix = config_.H_VIDEO_FILER_24H
  253. elif key_flag == '24h':
  254. key_prefix = config_.H_VIDEO_FILER_24H
  255. else:
  256. key_prefix = config_.H_VIDEO_FILER
  257. filter_videos_list = redis_helper.get_data_from_set(
  258. key_name=f"{key_prefix}{self.app_type}.{data_key}.{rule_key}"
  259. )
  260. if not filter_videos_list:
  261. return video_ids
  262. filter_videos = [int(video) for video in filter_videos_list]
  263. filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
  264. return filtered_videos
  265. def filter_videos_h(self, rule_key, data_key, ab_code, province_code, key_flag='', pool_type='rov'):
  266. """召回小时级更新的视频过滤"""
  267. # 预曝光过滤
  268. # st_pre = time.time()
  269. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  270. # et_pre = time.time()
  271. # log_.info({
  272. # 'logTimestamp': int(time.time() * 1000),
  273. # 'request_id': self.request_id,
  274. # 'app_type': self.app_type,
  275. # 'mid': self.mid,
  276. # 'uid': self.uid,
  277. # 'operation': 'preview_filter',
  278. # 'request_videos': self.video_ids,
  279. # 'preview_filter_result': filtered_pre_result,
  280. # 'executeTime': (time.time() - st_pre) * 1000
  281. # })
  282. if not filtered_pre_result:
  283. return None
  284. # 视频状态过滤
  285. # st_status = time.time()
  286. filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
  287. data_key=data_key, ab_code=ab_code,
  288. province_code=province_code, key_flag=key_flag)
  289. # et_status = time.time()
  290. # log_.info({
  291. # 'logTimestamp': int(time.time() * 1000),
  292. # 'request_id': self.request_id,
  293. # 'app_type': self.app_type,
  294. # 'mid': self.mid,
  295. # 'uid': self.uid,
  296. # 'operation': 'status_filter',
  297. # 'request_videos': filtered_pre_result,
  298. # 'status_filter_result': filtered_status_result,
  299. # 'executeTime': (time.time() - st_status) * 1000
  300. # })
  301. if not filtered_status_result:
  302. return None
  303. # 视频已曝光过滤
  304. st_viewed = time.time()
  305. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
  306. # et_viewed = time.time()
  307. log_.info({
  308. 'logTimestamp': int(time.time() * 1000),
  309. 'pool_type': pool_type,
  310. 'request_id': self.request_id,
  311. 'app_type': self.app_type,
  312. 'mid': self.mid,
  313. 'uid': self.uid,
  314. 'operation': 'view_filter',
  315. 'request_videos': filtered_status_result,
  316. 'view_filter_result': filtered_viewed_result,
  317. 'executeTime': (time.time() - st_viewed) * 1000
  318. })
  319. if not filtered_viewed_result:
  320. return None
  321. else:
  322. return [int(video_id) for video_id in filtered_viewed_result]
  323. def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
  324. """视频过滤"""
  325. # todo: 添加app和region的风险过滤。
  326. st_viewed = time.time()
  327. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  328. # videos_filtered.append(18562889)
  329. # videos_filtered.append(18613648)
  330. # videos_filtered.append(18608478)
  331. videos_filtered = self.filter_videos_with_festival(videos_filtered)
  332. # print(str(videos_filtered))
  333. # log_.info({
  334. # 'logTimestamp': int(time.time() * 1000),
  335. # 'pool_type': "zhangbo-filter-pool_type",
  336. # 'request_id': self.request_id,
  337. # 'app_type': self.app_type,
  338. # 'mid': "zhangbo-filter_videos",
  339. # 'uid': self.uid,
  340. # 'operation': 'shield_filter',
  341. # 'request_videos': self.video_ids,
  342. # 'shield_filter_result': videos_filtered,
  343. # 'executeTime': (time.time() - st_viewed) * 1000
  344. # })
  345. # 预曝光过滤
  346. st_pre = time.time()
  347. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  348. # print("filtered_pre:", (time.time()-st_pre)*1000)
  349. # et_pre = time.time()
  350. # log_.info({
  351. # 'logTimestamp': int(time.time() * 1000),
  352. # 'request_id': self.request_id,
  353. # 'app_type': self.app_type,
  354. # 'mid': self.mid,
  355. # 'uid': self.uid,
  356. # 'operation': 'preview_filter',
  357. # 'request_videos': self.video_ids,
  358. # 'preview_filter_result': filtered_pre_result,
  359. # 'executeTime': (time.time() - st_pre) * 1000
  360. # })
  361. if not filtered_pre_result:
  362. return None
  363. # 视频状态过滤采用离线定时过滤方案
  364. # 视频状态过滤
  365. # st_status = time.time()
  366. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  367. # et_status = time.time()
  368. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  369. # filtered_status_result, (et_status - st_status) * 1000))
  370. # if not filtered_status_result:
  371. # return None
  372. # 视频已曝光过滤
  373. st_viewed = time.time()
  374. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result, region_code=region_code)
  375. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  376. # et_viewed = time.time()
  377. # log_.info({
  378. # 'logTimestamp': int(time.time() * 1000),
  379. # 'pool_type': pool_type,
  380. # 'request_id': self.request_id,
  381. # 'app_type': self.app_type,
  382. # 'mid': self.mid,
  383. # 'uid': self.uid,
  384. # 'operation': 'view_filter',
  385. # 'request_videos': filtered_pre_result,
  386. # 'view_filter_result': filtered_viewed_result,
  387. # 'executeTime': (time.time() - st_viewed) * 1000
  388. # })
  389. if not filtered_viewed_result:
  390. return None
  391. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  392. return filtered_viewed_videos
  393. # if pool_type == 'flow' or pool_type=='normal':
  394. # # 流量池视频需过滤屏蔽视频
  395. # if region_code is None or shield_config is None:
  396. # return filtered_viewed_videos
  397. # else:
  398. # shield_key_name_list = shield_config.get(region_code, None)
  399. # if shield_key_name_list is not None:
  400. # filtered_shield_video_ids = self.filter_shield_video(
  401. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  402. # )
  403. # log_.info({
  404. # 'logTimestamp': int(time.time() * 1000),
  405. # 'pool_type': pool_type,
  406. # 'request_id': self.request_id,
  407. # 'app_type': self.app_type,
  408. # 'mid': self.mid,
  409. # 'uid': self.uid,
  410. # 'operation': 'shield_filter',
  411. # 'request_videos': filtered_viewed_videos,
  412. # 'shield_filter_result': filtered_shield_video_ids,
  413. # 'executeTime': (time.time() - st_viewed) * 1000
  414. # })
  415. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  416. # return filtered_shield_video_ids
  417. # else:
  418. # return filtered_viewed_videos
  419. # else:
  420. # return filtered_viewed_videos
  421. def filter_video_previewed(self, video_ids):
  422. """
  423. 预曝光过滤
  424. :param video_ids: 需过滤的视频列表 type-list
  425. :return: filtered_videos 过滤后的列表 type-list
  426. """
  427. pre_time = time.time()
  428. if not self.mid or self.mid == 'null':
  429. # mid为空时,不做预曝光过滤
  430. return video_ids
  431. # 根据Redis缓存中的数据过滤
  432. redis_helper = RedisHelper()
  433. # key拼接
  434. key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
  435. #print("key_name:", key_name)
  436. pe_videos_list = redis_helper.get_data_from_set(key_name)
  437. #print("pe_videos_list:", pe_videos_list)
  438. # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
  439. # self.app_type, self.mid, self.uid, pe_videos_list))
  440. # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
  441. # self.app_type, self.mid, self.uid, video_ids))
  442. if not pe_videos_list:
  443. return video_ids
  444. pe_videos = [int(video) for video in pe_videos_list]
  445. #print("pe_videos:", len(pe_videos))
  446. filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
  447. #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
  448. return filtered_videos
  449. # def filter_video_status(self, video_ids):
  450. # """
  451. # 对视频状态进行过滤
  452. # :param video_ids: 视频id列表 type-list
  453. # :return: filtered_videos
  454. # """
  455. # if len(video_ids) == 1:
  456. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  457. # "SELECT video_id " \
  458. # "FROM {} " \
  459. # "WHERE audit_status = 5 " \
  460. # "AND applet_rec_status IN (1, -6) " \
  461. # "AND open_status = 1 " \
  462. # "AND payment_status = 0 " \
  463. # "AND encryption_status != 5 " \
  464. # "AND transcoding_status = 3 " \
  465. # "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  466. # else:
  467. # sql = "set hg_experimental_enable_shard_pruning=off; " \
  468. # "SELECT video_id " \
  469. # "FROM {} " \
  470. # "WHERE audit_status = 5 " \
  471. # "AND applet_rec_status IN (1, -6) " \
  472. # "AND open_status = 1 " \
  473. # "AND payment_status = 0 " \
  474. # "AND encryption_status != 5 " \
  475. # "AND transcoding_status = 3 " \
  476. # "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  477. #
  478. # hologres_helper = HologresHelper()
  479. # data = hologres_helper.get_data(sql=sql)
  480. # filtered_videos = [int(temp[0]) for temp in data]
  481. # return filtered_videos
  482. def filter_video_viewed(self, video_ids, region_code, types=(1, 6,)):
  483. """
  484. 调用后端接口过滤用户已观看视频
  485. :param video_ids: 视频id列表 type-list
  486. :param types: 过滤参数 type-tuple, 默认(1, )
  487. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  488. :return: filtered_videos
  489. """
  490. # 获取对应端的过滤参数types
  491. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  492. if types is None:
  493. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  494. request_data = {"appType": self.app_type,
  495. "mid": self.mid,
  496. "uid": self.uid,
  497. "types": list(types),
  498. "videoIds": video_ids,
  499. "cityCode": region_code,
  500. "hotSenceType": self.env_dict["hotSenceType"] if "hotSenceType" in self.env_dict else 0
  501. }
  502. # print(request_data)
  503. # 调用http接口
  504. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(3, 3))
  505. # print("result:", result)
  506. if result is None:
  507. # print("result is None")
  508. # log_.info('过滤失败,types: {}'.format(types))
  509. return []
  510. if result['code'] != 0:
  511. # log_.info('过滤失败,types: {}'.format(types))
  512. return []
  513. filtered_videos = result['data']
  514. return filtered_videos
  515. def filter_video_viewed_new(self, video_ids):
  516. """
  517. 调用后端接口过滤用户已观看视频
  518. :param video_ids: 视频id列表 type-list
  519. :param types: 过滤参数 type-tuple, 默认(1, )
  520. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  521. :return: filtered_videos
  522. """
  523. # 获取对应端的过滤参数types
  524. st_time = time.time()
  525. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  526. #print(types)
  527. if types is None:
  528. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  529. if 6 in types:
  530. types = list(types)
  531. types.remove(6)
  532. #print(types)
  533. request_data = {"appType": self.app_type,
  534. "mid": self.mid,
  535. "uid": self.uid,
  536. "types": list(types),
  537. "videoIds": video_ids}
  538. # 调用http接口
  539. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
  540. #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
  541. if result is None:
  542. # log_.info('过滤失败,types: {}'.format(types))
  543. return []
  544. if result['code'] != 0:
  545. # log_.info('过滤失败,types: {}'.format(types))
  546. return []
  547. filtered_videos = result['data']
  548. return filtered_videos
  549. def filter_shield_video(self, video_ids, shield_key_name_list):
  550. """
  551. 过滤屏蔽视频视频
  552. :param video_ids: 需过滤的视频列表 type-list
  553. :param shield_key_name_list: 过滤视频 redis-key
  554. :return: filtered_videos 过滤后的列表 type-list
  555. """
  556. # print("filter_shield_video:", len(filter_shield_video))
  557. if len(video_ids) == 0:
  558. return video_ids
  559. # 根据Redis缓存中的数据过滤
  560. redis_helper = RedisHelper()
  561. for shield_key_name in shield_key_name_list:
  562. video_ids = [
  563. int(video_id) for video_id in video_ids
  564. if not redis_helper.data_exists_with_set(key_name=shield_key_name, value=video_id)
  565. ]
  566. # shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
  567. # if not shield_videos_list:
  568. # continue
  569. # shield_videos = [int(video) for video in shield_videos_list]
  570. # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
  571. # print("video_ids:", len(video_ids))
  572. return video_ids
  573. def new_filter_video(self):
  574. """视频过滤"""
  575. # 1. 预曝光过滤
  576. st_pre = time.time()
  577. #print("new_filter video_ids:", self.video_ids)
  578. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  579. if not filtered_pre_result:
  580. return None
  581. # log_.info({
  582. # 'logTimestamp': int(time.time() * 1000),
  583. # 'request_id': self.request_id,
  584. # 'app_type': self.app_type,
  585. # 'mid': self.mid,
  586. # 'uid': self.uid,
  587. # 'operation': 'preview_filter',
  588. # 'request_videos': self.video_ids,
  589. # 'preview_filter_result': filtered_pre_result,
  590. # 'executeTime': (time.time() - st_pre) * 1000
  591. # })
  592. #2. 视频已曝光过滤
  593. st_viewed = time.time()
  594. #print("---filtered viewed---")
  595. #print("filtered_pre_result:",filtered_pre_result)
  596. filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
  597. if not filtered_viewed_result:
  598. return None
  599. return filtered_viewed_result
  600. def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
  601. flow_video_list = []
  602. normal_video_list = []
  603. for v_id in vid_list:
  604. if v_id in flow_vids_set:
  605. flow_video_list.append(v_id)
  606. else:
  607. normal_video_list.append(v_id)
  608. shield_key_name_list = shield_config.get(region_code, None)
  609. if shield_key_name_list is not None:
  610. filtered_shield_video_ids = self.filter_shield_video(
  611. video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
  612. )
  613. return normal_video_list, filtered_shield_video_ids
  614. else:
  615. return normal_video_list, flow_video_list
  616. def filter_movie_religion_video(self, video_ids):
  617. """过滤白名单视频(影视,宗教)"""
  618. # 影视 + 宗教: rov.filter.movie.{videoId}
  619. # 宗教: rov.filter.religion.{videoId}
  620. st_time = time.time()
  621. if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  622. config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
  623. config_.APP_TYPE['ZUI_JING_QI'],
  624. config_.APP_TYPE['H5']]:
  625. # 过滤 影视 + 宗教
  626. keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
  627. elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
  628. config_.APP_TYPE['ZUI_JING_QI'],
  629. config_.APP_TYPE['H5']]:
  630. # 过滤 影视 + 宗教
  631. keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
  632. else:
  633. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  634. return video_ids
  635. redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
  636. filter_videos = []
  637. for i in range(len(keys)//1000 + 1):
  638. video_ids_temp = video_ids[i*1000:(i+1)*1000]
  639. if len(video_ids_temp) == 0:
  640. break
  641. mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
  642. filter_videos.extend([int(data) for data in mget_res if data is not None])
  643. if len(filter_videos) > 0:
  644. filtered_videos = set(video_ids) - set(filter_videos)
  645. #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
  646. return list(filtered_videos)
  647. else:
  648. #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
  649. return video_ids
  650. def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
  651. """视频过滤"""
  652. # 预曝光过滤
  653. st_pre = time.time()
  654. #print("self.video_ids:", len(self.video_ids))
  655. filtered_pre_result = self.filter_video_previewed(self.video_ids)
  656. if not filtered_pre_result:
  657. return None
  658. #print("filtered_pre_result:", len(filtered_pre_result))
  659. #print(filtered_pre_result)
  660. # 视频已曝光过滤/白名单过滤
  661. st_viewed = time.time()
  662. t = [
  663. gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
  664. gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
  665. gevent.joinall(t)
  666. filtered_result_list = [i.get() for i in t]
  667. #print("filtered_result_list1:",filtered_result_list[0])
  668. #print("filtered_result_list2:",filtered_result_list[1])
  669. filtered_viewed_set = set('')
  670. for i in filtered_result_list[0]:
  671. filtered_viewed_set.add(int(i))
  672. filter_video_set =set('')
  673. for j in filtered_result_list[1]:
  674. filter_video_set.add(int(j))
  675. filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
  676. #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
  677. #print("filtered:",len(filtered_viewed_result))
  678. if not filtered_viewed_result:
  679. return None
  680. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  681. #print("result:", filtered_viewed_videos)
  682. if flow_set is None:
  683. return filtered_viewed_videos
  684. else:
  685. # 流量池视频需过滤屏蔽视频
  686. if region_code is None or shield_config is None:
  687. return filtered_viewed_videos
  688. else:
  689. normal_recall_ids = []
  690. left_flow_ids = []
  691. for vid in filtered_viewed_videos:
  692. if vid in flow_set:
  693. left_flow_ids.append(vid)
  694. else:
  695. normal_recall_ids.append(vid)
  696. shield_key_name_list = shield_config.get(region_code, None)
  697. if shield_key_name_list is not None:
  698. filtered_shield_video_ids = self.filter_shield_video(
  699. video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
  700. )
  701. return normal_recall_ids+filtered_shield_video_ids
  702. else:
  703. return filtered_viewed_videos
  704. def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
  705. """视频过滤"""
  706. # todo: 添加app和region的风险过滤。
  707. st_viewed = time.time()
  708. videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
  709. videos_filtered = self.filter_videos_with_festival(videos_filtered)
  710. # log_.info({
  711. # 'logTimestamp': int(time.time() * 1000),
  712. # 'pool_type': "zhangbo-filter-pool_type",
  713. # 'request_id': self.request_id,
  714. # 'app_type': self.app_type,
  715. # 'mid': "zhangbo-filter_videos_status",
  716. # 'uid': self.uid,
  717. # 'operation': 'shield_filter',
  718. # 'request_videos': self.video_ids,
  719. # 'shield_filter_result': videos_filtered,
  720. # 'executeTime': (time.time() - st_viewed) * 1000
  721. # })
  722. # 预曝光过滤
  723. st_pre = time.time()
  724. filtered_pre_result = self.filter_video_previewed(videos_filtered)
  725. # print("filtered_pre:", (time.time()-st_pre)*1000)
  726. # et_pre = time.time()
  727. # log_.info({
  728. # 'logTimestamp': int(time.time() * 1000),
  729. # 'request_id': self.request_id,
  730. # 'app_type': self.app_type,
  731. # 'mid': self.mid,
  732. # 'uid': self.uid,
  733. # 'operation': 'preview_filter',
  734. # 'request_videos': self.video_ids,
  735. # 'preview_filter_result': filtered_pre_result,
  736. # 'executeTime': (time.time() - st_pre) * 1000
  737. # })
  738. if not filtered_pre_result:
  739. return None
  740. # 视频状态过滤采用离线定时过滤方案
  741. # 视频状态过滤
  742. # st_status = time.time()
  743. # filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
  744. # et_status = time.time()
  745. # log_.info('filter by video status: result = {}, execute time = {}ms'.format(
  746. # filtered_status_result, (et_status - st_status) * 1000))
  747. # if not filtered_status_result:
  748. # return None
  749. # 视频已曝光过滤
  750. st_viewed = time.time()
  751. filtered_viewed_result = self.filter_video_viewed_status(video_ids=filtered_pre_result, region_code=region_code)
  752. # print("filtered_pre:", (time.time() - st_viewed) * 1000)
  753. # et_viewed = time.time()
  754. # log_.info({
  755. # 'logTimestamp': int(time.time() * 1000),
  756. # 'pool_type': pool_type,
  757. # 'request_id': self.request_id,
  758. # 'app_type': self.app_type,
  759. # 'mid': self.mid,
  760. # 'uid': self.uid,
  761. # 'operation': 'view_filter',
  762. # 'request_videos': filtered_pre_result,
  763. # 'view_filter_result': filtered_viewed_result,
  764. # 'executeTime': (time.time() - st_viewed) * 1000
  765. # })
  766. if not filtered_viewed_result:
  767. return None
  768. filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
  769. return filtered_viewed_videos
  770. # if pool_type == 'flow' or pool_type=='normal':
  771. # # 流量池视频需过滤屏蔽视频
  772. # if region_code is None or shield_config is None:
  773. # return filtered_viewed_videos
  774. # else:
  775. # shield_key_name_list = shield_config.get(region_code, None)
  776. # if shield_key_name_list is not None:
  777. # filtered_shield_video_ids = self.filter_shield_video(
  778. # video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
  779. # )
  780. # log_.info({
  781. # 'logTimestamp': int(time.time() * 1000),
  782. # 'pool_type': pool_type,
  783. # 'request_id': self.request_id,
  784. # 'app_type': self.app_type,
  785. # 'mid': self.mid,
  786. # 'uid': self.uid,
  787. # 'operation': 'shield_filter',
  788. # 'request_videos': filtered_viewed_videos,
  789. # 'shield_filter_result': filtered_shield_video_ids,
  790. # 'executeTime': (time.time() - st_viewed) * 1000
  791. # })
  792. # # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
  793. # return filtered_shield_video_ids
  794. # else:
  795. # return filtered_viewed_videos
  796. # else:
  797. # return filtered_viewed_videos
  798. def filter_video_viewed_status(self, video_ids, region_code, types=(1, 6,)):
  799. """
  800. 调用后端接口过滤用户已观看视频
  801. :param video_ids: 视频id列表 type-list
  802. :param types: 过滤参数 type-tuple, 默认(1, )
  803. 1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
  804. :return: filtered_videos
  805. """
  806. # 获取对应端的过滤参数types
  807. types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
  808. if types is None:
  809. types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
  810. types = list(types)
  811. types.append(2)
  812. request_data = {"appType": self.app_type,
  813. "mid": self.mid,
  814. "uid": self.uid,
  815. "types": types,
  816. "videoIds": video_ids,
  817. "cityCode": region_code,
  818. "hotSenceType": self.env_dict["hotSenceType"] if "hotSenceType" in self.env_dict else 0
  819. }
  820. # print(request_data)
  821. # 调用http接口
  822. result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(3, 3))
  823. # print("result:", result)
  824. if result is None:
  825. # print("result is None")
  826. # log_.info('过滤失败,types: {}'.format(types))
  827. return []
  828. if result['code'] != 0:
  829. # log_.info('过滤失败,types: {}'.format(types))
  830. return []
  831. filtered_videos = result['data']
  832. return filtered_videos
  833. def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
  834. # 0 用一个开关控制,是否过滤生效。 便于回滚功能。
  835. risk_filter_flag = self.risk_filter_flag
  836. if not risk_filter_flag:
  837. return self.truncation(video_ids)
  838. # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤
  839. app_region_filtered = self.app_region_filtered
  840. if app_type in app_region_filtered.keys():
  841. if_filtered = False
  842. if region_code in app_region_filtered[app_type]:
  843. if_filtered = True
  844. else:
  845. if_filtered = True
  846. if not if_filtered:
  847. return self.truncation(video_ids)
  848. # 2 确认过滤,获取风险video列表param_update_risk_videos
  849. videos_with_risk = self.videos_with_risk
  850. # 3 过滤 返回结果
  851. video_ids_new = [i for i in video_ids if i not in videos_with_risk]
  852. # print(risk_filter_flag)
  853. # print(app_region_filtered)
  854. # print(video_ids)
  855. # print(app_type)
  856. # print(region_code)
  857. # print(videos_with_risk)
  858. # print(video_ids_new)
  859. # print(len(video_ids))
  860. # print(len(video_ids_new))
  861. return self.truncation(video_ids_new)
  862. def truncation(self, video_ids):
  863. if self.force_truncation is None:
  864. return video_ids
  865. else:
  866. return video_ids[:min(self.force_truncation, len(video_ids))]
  867. def filter_videos_with_festival(self, video_ids: List[int]):
  868. # 1 获取当前时间,判断过滤标准
  869. now_date = datetime.today()
  870. now_dt = datetime.strftime(now_date, '%Y%m%d%H')
  871. now_dt_int = int(now_dt)
  872. filter_fes = []
  873. for k, v1, v2 in FESTIVAL:
  874. if now_dt_int >= v1 and now_dt_int < v2:
  875. filter_fes.append(k)
  876. if len(filter_fes) == 0:
  877. return video_ids
  878. # 2 过滤
  879. redis_keys = ["alg_recsys_video_tags_" + str(id) for id in video_ids]
  880. redis_helper = RedisHelper()
  881. redis_values = redis_helper.get_batch_key(redis_keys)
  882. # print(str(video_ids))
  883. # print(str(redis_values))
  884. if redis_values and len(redis_values) > 0 and len(redis_values) == len(redis_keys):
  885. video_ids_new = []
  886. for id, tags in zip(video_ids, redis_values):
  887. flag = True
  888. if tags and len(tags) > 0:
  889. for t in tags.split(","):
  890. if t in filter_fes:
  891. flag = False
  892. break
  893. if flag:
  894. video_ids_new.append(id)
  895. return video_ids_new
  896. else:
  897. return video_ids
  898. if __name__ == '__main__':
  899. user = [
  900. ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
  901. ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
  902. ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
  903. ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
  904. ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
  905. ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
  906. ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
  907. ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
  908. ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
  909. ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
  910. ]
  911. video_df = pd.read_csv('./data/videoids.csv')
  912. videoid_list = video_df['videoid'].tolist()
  913. for mid, uid in user:
  914. video_ids = random.sample(videoid_list, 1000)
  915. start_time = time.time()
  916. filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
  917. res = filter_.filter_videos_new()
  918. print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
  919. # filter_.filter_video_status(video_ids=[1, 3, 5])
  920. # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
  921. # res = get_videos_remain_view_count(4, videos)
  922. # print(res)
  923. # text = '测试 @李倩'
  924. # send_msg_to_feishu(text)
  925. # update_video_w_h_rate(video_id=113, key_name='')
  926. # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
  927. # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
  928. # res = request_get(request_url=request_url, timeout=100)
  929. # res = get_user_has30day_return(mid=mid)
  930. # print(res, type(res))