video_recall.py 169 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671
  1. import time
  2. import traceback
  3. import random
  4. from datetime import date, timedelta, datetime
  5. from log import Log
  6. from db_helper import RedisHelper
  7. from config import set_config
  8. from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count, send_msg_to_feishu
  9. import gevent
  10. import json
  11. log_ = Log()
  12. config_ = set_config()
  13. class PoolRecall(object):
  14. """召回"""
  15. def __init__(self, request_id, app_type, client_info=None, mid='', uid='', ab_code='',
  16. rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None, shield_config=None,
  17. video_id=None, level_weight=None):
  18. """
  19. 初始化
  20. :param request_id: request_id
  21. :param app_type: 产品标识 type-int
  22. :param client_info: 用户位置信息 {"cityCode": "100000"}
  23. :param mid: mid type-string
  24. :param uid: uid type-string
  25. :param ab_code: ab_code type-int
  26. :param params:
  27. """
  28. self.request_id = request_id
  29. self.app_type = app_type
  30. self.mid = mid
  31. self.uid = uid
  32. self.video_id = video_id
  33. self.ab_code = ab_code
  34. self.client_info = client_info
  35. self.rule_key = rule_key
  36. self.data_key = data_key
  37. self.no_op_flag = no_op_flag
  38. self.rule_key_30day = rule_key_30day
  39. self.shield_config = shield_config
  40. self.level_weight = level_weight
  41. self.redis_helper = RedisHelper(params=params)
  42. def copy_redis_zset_data(self, from_key_name, to_key_name):
  43. # 获取from_key_name中的数据
  44. records = self.redis_helper.get_data_zset_with_index(key_name=from_key_name, start=0, end=-1, with_scores=True)
  45. if records is not None:
  46. data = {}
  47. for video_id, score in records:
  48. data[int(video_id)] = score
  49. # 重新写入
  50. if self.redis_helper.key_exists(to_key_name):
  51. self.redis_helper.del_keys(key_name=to_key_name)
  52. self.redis_helper.add_data_with_zset(key_name=to_key_name, data=data, expire_time=1*3600)
  53. return True
  54. else:
  55. return False
  56. def update_mid_data(self, h_recall_mid_key, h_record_key, key_prefix):
  57. # 判断当前小时的小时级列表是否更新
  58. now_date = datetime.today()
  59. h = datetime.now().hour
  60. now_dt = datetime.strftime(now_date, '%Y%m%d')
  61. now_h_recall_key = f"{key_prefix}{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
  62. if self.redis_helper.key_exists(key_name=now_h_recall_key):
  63. flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
  64. if flag:
  65. value = {'date': now_dt, 'h': h}
  66. self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=1*3600)
  67. else:
  68. if h == 0:
  69. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  70. redis_h = 23
  71. else:
  72. redis_dt = now_dt
  73. redis_h = h - 1
  74. now_h_recall_key = f"{key_prefix}{self.app_type}.{self.data_key}.{self.rule_key}.{redis_dt}.{redis_h}"
  75. flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
  76. if flag:
  77. value = {'date': redis_dt, 'h': redis_h}
  78. self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=1*3600)
  79. def get_mid_h_key(self, province_code, key_flag=''):
  80. if key_flag == 'region_24h':
  81. # mid对应小时级视频列表 redis-key 地域分组相对24h
  82. h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H}{self.app_type}.{self.mid}"
  83. # 判断mid对应小时级视频列表 时间记录
  84. h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H}{self.app_type}.{self.mid}"
  85. elif key_flag in ['24h', 'day_24h']:
  86. # mid对应小时级视频列表 redis-key 相对24h
  87. h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_24H}{self.app_type}.{self.mid}"
  88. # 判断mid对应小时级视频列表 时间记录
  89. h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_24H}{self.app_type}.{self.mid}"
  90. else:
  91. # mid对应小时级视频列表 redis-key
  92. h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.app_type}.{self.mid}"
  93. # 判断mid对应小时级视频列表 时间记录
  94. h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX}{self.app_type}.{self.mid}"
  95. # 列表存储 redis-key prefix
  96. if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  97. if key_flag == 'region_24h':
  98. if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2'):
  99. key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{province_code}."
  100. else:
  101. key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{province_code}."
  102. elif key_flag == 'day_24h':
  103. key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{province_code}."
  104. else:
  105. key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{province_code}."
  106. elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  107. key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_24H
  108. elif key_flag == '24h':
  109. key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H
  110. else:
  111. key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H
  112. if not self.redis_helper.key_exists(key_name=h_record_key):
  113. # ###### 记录key不存在,copy列表,更新记录
  114. self.update_mid_data(h_recall_mid_key=h_recall_mid_key, h_record_key=h_record_key, key_prefix=key_prefix)
  115. # return h_recall_mid_key
  116. else:
  117. # ###### 记录key存在,判断date, h
  118. now_date = datetime.today()
  119. h = datetime.now().hour
  120. # 获取记录的date, h
  121. record = self.redis_helper.get_data_from_redis(key_name=h_record_key)
  122. record_dt = eval(record).get('date')
  123. record_h = eval(record).get('h')
  124. now_dt = datetime.strftime(now_date, '%Y%m%d')
  125. if record_dt == now_dt and int(record_h) == h:
  126. # 已获取当前小时数据
  127. pass
  128. # return h_recall_mid_key
  129. elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
  130. # 记录的h - 当前h = 1,判断当前h数据是否已更新
  131. now_h_recall_key = f"{key_prefix}{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
  132. # if not self.redis_helper.key_exists(key_name=now_h_recall_key):
  133. # 未更新
  134. # return h_recall_mid_key
  135. if self.redis_helper.key_exists(key_name=now_h_recall_key):
  136. # 已更新,重新获取更新mid对应列表及记录
  137. # self.redis_helper.del_keys(key_name=h_recall_mid_key)
  138. # self.redis_helper.del_keys(key_name=h_record_key)
  139. flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
  140. if flag:
  141. new_record = {'date': now_dt, 'h': h}
  142. self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(new_record), expire_time=2*3600)
  143. # return h_recall_mid_key
  144. else:
  145. self.update_mid_data(h_recall_mid_key=h_recall_mid_key, h_record_key=h_record_key, key_prefix=key_prefix)
  146. # return h_recall_mid_key
  147. return h_recall_mid_key
  148. def rov_pool_recall_by_h(self, size=10, expire_time=24*3600):
  149. """
  150. 从小时级更新ROV召回池中获取视频
  151. :param size: 获取视频个数
  152. :param expire_time: 末位视频记录redis过期时间
  153. :return:
  154. """
  155. start_time = time.time()
  156. # 获取provinceCode
  157. province_code = self.client_info.get('provinceCode', '-1')
  158. if province_code == '':
  159. province_code = '-1'
  160. if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  161. push_from = config_.PUSH_FROM['rov_recall_24h']
  162. else:
  163. push_from = config_.PUSH_FROM['rov_recall_h']
  164. # 获取mid对应的小时级列表redis-key
  165. h_recall_mid_key = self.get_mid_h_key(province_code=province_code)
  166. if not self.redis_helper.key_exists(h_recall_mid_key):
  167. recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  168. else:
  169. # 过滤的视频
  170. fil_video_ids = []
  171. recall_result = []
  172. # 每次获取的视频数
  173. get_size = size * 5
  174. # 记录获取频次
  175. freq = 0
  176. while len(recall_result) < size:
  177. freq += 1
  178. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  179. break
  180. # 获取数据
  181. data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
  182. start=(freq - 1) * get_size, end=freq * get_size - 1,
  183. with_scores=True)
  184. if not data:
  185. # log_.info('小时级更新视频已取完')
  186. break
  187. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  188. video_ids = []
  189. video_score = {}
  190. for value in data:
  191. video_id = int(value[0])
  192. video_ids.append(video_id)
  193. video_score[video_id] = value[1]
  194. # 过滤
  195. filter_ = FilterVideos(request_id=self.request_id,
  196. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  197. ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, province_code)
  198. ge.join()
  199. filtered_result = ge.get()
  200. if filtered_result:
  201. # 添加视频源参数 pushFrom, abCode
  202. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  203. 'pushFrom': push_from, 'abCode': self.ab_code}
  204. for item in filtered_result if video_score.get(int(item)) is not None]
  205. recall_result.extend(temp_result)
  206. fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
  207. else:
  208. fil_video_ids.extend(video_ids)
  209. # 将被过滤的视频进行移除
  210. for value in fil_video_ids:
  211. self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
  212. # 判断获取到的小时级数据数量
  213. if len(recall_result) < size:
  214. # 补充数据
  215. rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  216. # 去重合并
  217. now_video_ids = [item.get('videoId') for item in recall_result]
  218. for video in rov_recall_result:
  219. vid = video.get('videoId')
  220. if vid not in now_video_ids:
  221. recall_result.append(video)
  222. now_video_ids.append(vid)
  223. if len(recall_result) >= size:
  224. break
  225. else:
  226. continue
  227. log_.info({
  228. 'logTimestamp': int(time.time() * 1000),
  229. 'request_id': self.request_id,
  230. 'operation': 'rov_pool_recall_by_h',
  231. 'executeTime': (time.time() - start_time) * 1000
  232. })
  233. return recall_result[:size]
  234. def rov_pool_recall_by_day(self, size=4, expire_time=24*3600):
  235. """
  236. 从天级规则更新列表中获取视频
  237. :param size: 获取视频个数
  238. :param expire_time: 末位视频记录redis过期时间
  239. :return:
  240. """
  241. start_time = time.time()
  242. # 获取天级规则更新列表相关redis key, 用户上一次在天级规则更新列表对应的位置
  243. rule_key_name, last_rule_day_recall_key, idx = self.get_video_last_idx_day()
  244. # 获取天级规则更新列表数据
  245. if not rule_key_name:
  246. # log_.info('天级规则更新列表中无视频')
  247. recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  248. else:
  249. recall_result = []
  250. # 每次获取的视频数
  251. get_size = size * 5
  252. # 记录获取频次
  253. freq = 0
  254. while len(recall_result) < size:
  255. freq += 1
  256. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  257. break
  258. # 获取数据
  259. data = self.redis_helper.get_data_zset_with_index(key_name=rule_key_name,
  260. start=idx, end=idx + get_size - 1,
  261. with_scores=True)
  262. if not data:
  263. # log_.info('天级规则更新视频已取完')
  264. break
  265. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  266. video_ids = []
  267. video_score = {}
  268. for value in data:
  269. video_id = int(value[0])
  270. video_ids.append(video_id)
  271. video_score[video_id] = value[1]
  272. # 过滤
  273. filter_ = FilterVideos(request_id=self.request_id,
  274. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  275. ge = gevent.spawn(filter_.filter_videos)
  276. ge.join()
  277. filtered_result = ge.get()
  278. if filtered_result:
  279. # 添加视频源参数 pushFrom, abCode
  280. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  281. 'pushFrom': config_.PUSH_FROM['rov_recall_day'], 'abCode': self.ab_code}
  282. for item in filtered_result if video_score.get(int(item)) is not None]
  283. recall_result.extend(temp_result)
  284. else:
  285. # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  286. self.redis_helper.set_data_to_redis(key_name=last_rule_day_recall_key, value=data[-1][0],
  287. expire_time=expire_time)
  288. idx += get_size
  289. # 判断获取到的天级规则数据数量
  290. if len(recall_result) < size:
  291. # 补充数据
  292. rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  293. # 去重合并
  294. now_video_ids = [item.get('videoId') for item in recall_result]
  295. for video in rov_recall_result:
  296. vid = video.get('videoId')
  297. if vid not in now_video_ids:
  298. recall_result.append(video)
  299. now_video_ids.append(vid)
  300. if len(recall_result) >= size:
  301. break
  302. else:
  303. continue
  304. log_.info({
  305. 'logTimestamp': int(time.time() * 1000),
  306. 'request_id': self.request_id,
  307. 'operation': 'rov_pool_recall_by_day',
  308. 'executeTime': (time.time() - start_time) * 1000
  309. })
  310. return recall_result[:size]
  311. def rov_pool_recall(self, size=10, expire_time=24*3600, video_type='', push_from=config_.PUSH_FROM['rov_recall']):
  312. """
  313. 从ROV召回池中获取视频
  314. :param size: 获取视频个数
  315. :param expire_time: 末位视频记录redis过期时间
  316. :param video_type: 视频列表类别
  317. :param push_from: 视频来源标记
  318. :return:
  319. """
  320. start_time = time.time()
  321. # 获取生效中的置顶视频
  322. if self.no_op_flag:
  323. top_video_ids, top_video_result = [], []
  324. elif self.client_info is None:
  325. # 无用户位置信息时,不获取置顶视频
  326. top_video_ids, top_video_result = [], []
  327. else:
  328. top_video_ids, top_video_result = self.get_top_videos()
  329. # log_.info('===top video result = {}'.format(top_video_ids))
  330. # 获取修改过rov的视频
  331. if self.no_op_flag:
  332. update_rov_video_ids, update_rov_result = [], []
  333. else:
  334. update_rov_video_ids, update_rov_result = self.get_update_rov_videos()
  335. # log_.info('update rov result = {}'.format(update_rov_video_ids))
  336. # 与置顶视频去重
  337. update_rov_video_ids_dup, update_rov_dup_result = [], []
  338. for item in update_rov_result:
  339. if item['videoId'] not in top_video_ids:
  340. update_rov_video_ids_dup.append(item['videoId'])
  341. update_rov_dup_result.append(item)
  342. # 获取相关redis key, 用户上一次在rov召回池对应的位置
  343. rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx(video_type=video_type)
  344. if not rov_pool_key:
  345. # log_.info('ROV召回池中无视频')
  346. if (not update_rov_dup_result) and (not top_video_result):
  347. return []
  348. rov_pool_recall_result = top_video_result.extend(update_rov_dup_result)
  349. rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
  350. return rov_pool_recall_result[:size]
  351. rov_pool_recall_result = []
  352. # 每次获取的视频数
  353. get_size = size * 5
  354. # 记录获取频次
  355. freq = 0
  356. while len(rov_pool_recall_result) < size:
  357. freq += 1
  358. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  359. break
  360. # 获取数据
  361. st_get = time.time()
  362. data = self.redis_helper.get_data_zset_with_index(key_name=rov_pool_key,
  363. start=idx, end=idx + get_size - 1,
  364. with_scores=True)
  365. et_get = time.time()
  366. # log_.info('get data from rov pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  367. # freq, data, (et_get - st_get) * 1000))
  368. if not data:
  369. # log_.info('ROV召回池中的视频已取完')
  370. break
  371. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  372. video_ids = []
  373. video_score = {}
  374. for value in data:
  375. video_id = int(value[0])
  376. # 视频id在 生效中的置顶视频 或 修改过rov的视频 中,跳过
  377. if video_id in update_rov_video_ids_dup or video_id in top_video_ids:
  378. continue
  379. video_ids.append(video_id)
  380. video_score[video_id] = value[1]
  381. # 过滤
  382. filter_ = FilterVideos(request_id=self.request_id,
  383. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  384. ge = gevent.spawn(filter_.filter_videos)
  385. ge.join()
  386. filtered_result = ge.get()
  387. # filtered_result = filter_.filter_videos()
  388. if filtered_result:
  389. # 添加视频源参数 pushFrom, abCode
  390. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  391. 'pushFrom': push_from, 'abCode': self.ab_code}
  392. for item in filtered_result if video_score.get(int(item)) is not None]
  393. rov_pool_recall_result.extend(temp_result)
  394. else:
  395. # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  396. if self.mid:
  397. # mid为空时,不做记录
  398. self.redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=data[-1][0],
  399. expire_time=expire_time)
  400. idx += get_size
  401. # 生效中的置顶视频、被修改rov视频、rov召回池视频 归并排序
  402. if top_video_result:
  403. rov_pool_recall_result.extend(top_video_result)
  404. if update_rov_dup_result:
  405. rov_pool_recall_result.extend(update_rov_dup_result)
  406. rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
  407. # log_.info({
  408. # 'logTimestamp': int(time.time() * 1000),
  409. # 'request_id': self.request_id,
  410. # 'operation': 'rov_pool_recall',
  411. # 'executeTime': (time.time() - start_time) * 1000
  412. # })
  413. return rov_pool_recall_result[:size]
  414. def flow_pool_recall(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
  415. """从流量池中获取视频"""
  416. # add_flow_pool_recall_log
  417. flow_pool_recall_process = {}
  418. start_time = time.time()
  419. # 获取存在城市分组数据的城市编码列表
  420. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  421. # 获取provinceCode
  422. province_code = self.client_info.get('provinceCode', '-1')
  423. # 获取cityCode
  424. city_code = self.client_info.get('cityCode', '-1')
  425. if city_code in city_code_list:
  426. # 分城市数据存在时,获取城市分组数据
  427. region_code = city_code
  428. else:
  429. region_code = province_code
  430. if region_code == '':
  431. region_code = '-1'
  432. flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
  433. # add_flow_pool_recall_log
  434. flow_pool_recall_process['flow_pool_key'] = flow_pool_key
  435. # print(flow_pool_key)
  436. flow_pool_recall_result = []
  437. flow_pool_recall_videos = []
  438. # 每次获取的视频数
  439. get_size = size * 5
  440. # 记录获取频次
  441. freq = 0
  442. idx = 0
  443. while len(flow_pool_recall_result) < size:
  444. freq += 1
  445. if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
  446. break
  447. # 获取数据
  448. # st_get = time.time()
  449. data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
  450. start=idx, end=idx + get_size - 1,
  451. with_scores=True)
  452. # add_flow_pool_recall_log
  453. flow_pool_recall_process['initial_data'] = data
  454. # et_get = time.time()
  455. # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  456. # freq, data, (et_get - st_get) * 1000))
  457. if not data:
  458. # log_.info('流量池中的视频已取完')
  459. break
  460. # 将video_id 与 flow_pool, score做mapping整理
  461. video_ids = []
  462. video_mapping = {}
  463. video_score = {}
  464. for value in data:
  465. try:
  466. video_id, flow_pool = value[0].split('-')
  467. except Exception as e:
  468. log_.error({
  469. 'request_id': self.request_id,
  470. 'app_type': self.app_type,
  471. 'flow_pool_value': value
  472. })
  473. continue
  474. video_id = int(video_id)
  475. if video_id not in video_ids:
  476. video_ids.append(video_id)
  477. video_score[video_id] = value[1]
  478. if video_id not in video_mapping:
  479. video_mapping[video_id] = [flow_pool]
  480. else:
  481. video_mapping[video_id].append(flow_pool)
  482. # 过滤
  483. filter_ = FilterVideos(request_id=self.request_id,
  484. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  485. ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
  486. region_code=region_code, shield_config=self.shield_config)
  487. ge.join()
  488. filtered_result = ge.get()
  489. # add_flow_pool_recall_log
  490. flow_pool_recall_process['filtered_data'] = filtered_result
  491. # 检查可分发数
  492. if filtered_result:
  493. st_check = time.time()
  494. ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
  495. ge.join()
  496. check_result = ge.get()
  497. # add_flow_pool_recall_log
  498. flow_pool_recall_process['check_counts_data'] = check_result
  499. # log_.info({
  500. # 'logTimestamp': int(time.time() * 1000),
  501. # 'request_id': self.request_id,
  502. # 'app_type': self.app_type,
  503. # 'mid': self.mid,
  504. # 'uid': self.uid,
  505. # 'operation': 'check_video_counts',
  506. # 'executeTime': (time.time() - st_check) * 1000
  507. # })
  508. for item in check_result:
  509. video_id = int(item[0])
  510. flow_pool = item[1]
  511. if video_id not in flow_pool_recall_videos:
  512. # 取其中一个 flow_pool 作为召回结果
  513. # 添加视频源参数 pushFrom, abCode
  514. flow_pool_recall_result.append(
  515. {'videoId': video_id, 'flowPool': flow_pool,
  516. 'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
  517. 'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
  518. )
  519. flow_pool_recall_videos.append(video_id)
  520. # et_check = time.time()
  521. # log_.info('check result: result = {}, execute time = {}ms'.format(
  522. # check_result, (et_check - st_check) * 1000))
  523. # # 判断错误标记, True为错误
  524. # if error_flag:
  525. # # 结束流量池召回
  526. # break
  527. idx += get_size
  528. # log_.info({
  529. # 'logTimestamp': int(time.time() * 1000),
  530. # 'request_id': self.request_id,
  531. # 'operation': 'flow_pool_recall',
  532. # 'executeTime': (time.time() - start_time) * 1000
  533. # })
  534. return flow_pool_recall_result[:size], flow_pool_recall_process
  535. def flow_pool_recall_new(self, size=10, flow_pool_id=None):
  536. """从流量池中获取视频"""
  537. start_time = time.time()
  538. # 获取存在城市分组数据的城市编码列表
  539. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  540. # 获取provinceCode
  541. province_code = self.client_info.get('provinceCode', '-1')
  542. # 获取cityCode
  543. city_code = self.client_info.get('cityCode', '-1')
  544. if city_code in city_code_list:
  545. # 分城市数据存在时,获取城市分组数据
  546. region_code = city_code
  547. else:
  548. region_code = province_code
  549. if region_code == '':
  550. region_code = '-1'
  551. flow_pool_key = self.get_pool_redis_key('flow_set', flow_pool_id=flow_pool_id)
  552. # print(flow_pool_key)
  553. flow_pool_recall_result = []
  554. flow_pool_recall_videos = []
  555. # 每次获取的视频数
  556. get_size = size * 5
  557. # 记录获取频次
  558. freq = 0
  559. idx = 0
  560. while len(flow_pool_recall_result) < size:
  561. freq += 1
  562. if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
  563. break
  564. # 获取数据
  565. # st_get = time.time()
  566. data = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=get_size)
  567. # et_get = time.time()
  568. # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  569. # freq, data, (et_get - st_get) * 1000))
  570. if not data:
  571. # log_.info('流量池中的视频已取完')
  572. break
  573. # 将video_id 与 flow_pool, score做mapping整理
  574. video_ids = []
  575. video_mapping = {}
  576. video_score = {}
  577. for value in data:
  578. try:
  579. video_id, flow_pool = value.split('-')
  580. except Exception as e:
  581. log_.error({
  582. 'request_id': self.request_id,
  583. 'app_type': self.app_type,
  584. 'flow_pool_value': value
  585. })
  586. continue
  587. video_id = int(video_id)
  588. if video_id not in video_ids:
  589. video_ids.append(video_id)
  590. if video_id not in video_mapping:
  591. video_mapping[video_id] = [flow_pool]
  592. else:
  593. video_mapping[video_id].append(flow_pool)
  594. # 过滤
  595. filter_ = FilterVideos(request_id=self.request_id,
  596. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  597. ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
  598. region_code=region_code, shield_config=self.shield_config)
  599. ge.join()
  600. filtered_result = ge.get()
  601. # 检查可分发数
  602. if filtered_result:
  603. st_check = time.time()
  604. ge = gevent.spawn(self.check_video_counts_new, video_ids=filtered_result, flow_pool_mapping=video_mapping)
  605. ge.join()
  606. check_result = ge.get()
  607. # log_.info({
  608. # 'logTimestamp': int(time.time() * 1000),
  609. # 'request_id': self.request_id,
  610. # 'app_type': self.app_type,
  611. # 'mid': self.mid,
  612. # 'uid': self.uid,
  613. # 'operation': 'check_video_counts',
  614. # 'executeTime': (time.time() - st_check) * 1000
  615. # })
  616. for item in check_result:
  617. video_id = int(item[0])
  618. flow_pool = item[1]
  619. if video_id not in flow_pool_recall_videos:
  620. # 取其中一个 flow_pool 作为召回结果
  621. # 添加视频源参数 pushFrom, abCode
  622. flow_pool_recall_result.append(
  623. {'videoId': video_id, 'flowPool': flow_pool,
  624. 'rovScore': random.uniform(0, 100), 'pushFrom': config_.PUSH_FROM['flow_recall'],
  625. 'abCode': self.ab_code}
  626. )
  627. flow_pool_recall_videos.append(video_id)
  628. # et_check = time.time()
  629. # log_.info('check result: result = {}, execute time = {}ms'.format(
  630. # check_result, (et_check - st_check) * 1000))
  631. # # 判断错误标记, True为错误
  632. # if error_flag:
  633. # # 结束流量池召回
  634. # break
  635. idx += get_size
  636. # log_.info({
  637. # 'logTimestamp': int(time.time() * 1000),
  638. # 'request_id': self.request_id,
  639. # 'operation': 'flow_pool_recall',
  640. # 'executeTime': (time.time() - start_time) * 1000
  641. # })
  642. return flow_pool_recall_result[:size]
  643. def flow_pool_recall_new_with_level(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
  644. """从流量池中获取视频"""
  645. # add_flow_pool_recall_log
  646. flow_pool_recall_process = {}
  647. start_time = time.time()
  648. # 获取存在城市分组数据的城市编码列表
  649. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  650. # 获取provinceCode
  651. province_code = self.client_info.get('provinceCode', '-1')
  652. # 获取cityCode
  653. city_code = self.client_info.get('cityCode', '-1')
  654. if city_code in city_code_list:
  655. # 分城市数据存在时,获取城市分组数据
  656. region_code = city_code
  657. else:
  658. region_code = province_code
  659. if region_code == '':
  660. region_code = '-1'
  661. flow_pool_key, level = self.get_pool_redis_key('flow_set_level', flow_pool_id=flow_pool_id)
  662. # add_flow_pool_recall_log
  663. flow_pool_recall_process['flow_pool_key'] = flow_pool_key
  664. flow_pool_recall_process['level'] = level
  665. if flow_pool_key is None:
  666. return [], flow_pool_recall_process
  667. # print(flow_pool_key)
  668. flow_pool_recall_result = []
  669. flow_pool_recall_videos = []
  670. # 每次获取的视频数
  671. get_size = size * 5
  672. # 记录获取频次
  673. freq = 0
  674. idx = 0
  675. while len(flow_pool_recall_result) < size:
  676. freq += 1
  677. if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
  678. break
  679. # 获取数据
  680. # st_get = time.time()
  681. data = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=get_size)
  682. # add_flow_pool_recall_log
  683. flow_pool_recall_process['initial_data'] = data
  684. # et_get = time.time()
  685. # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  686. # freq, data, (et_get - st_get) * 1000))
  687. if not data:
  688. # log_.info('流量池中的视频已取完')
  689. break
  690. # 将video_id 与 flow_pool, score做mapping整理
  691. video_ids = []
  692. video_mapping = {}
  693. video_score = {}
  694. for value in data:
  695. try:
  696. video_id, flow_pool = value.split('-')
  697. except Exception as e:
  698. log_.error({
  699. 'request_id': self.request_id,
  700. 'app_type': self.app_type,
  701. 'flow_pool_value': value
  702. })
  703. continue
  704. video_id = int(video_id)
  705. if video_id not in video_ids:
  706. video_ids.append(video_id)
  707. if video_id not in video_mapping:
  708. video_mapping[video_id] = [flow_pool]
  709. else:
  710. video_mapping[video_id].append(flow_pool)
  711. # 过滤
  712. filter_ = FilterVideos(request_id=self.request_id,
  713. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  714. ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
  715. region_code=region_code, shield_config=self.shield_config)
  716. ge.join()
  717. filtered_result = ge.get()
  718. # add_flow_pool_recall_log
  719. flow_pool_recall_process['filtered_data'] = filtered_result
  720. # 检查可分发数
  721. if filtered_result:
  722. # st_check = time.time()
  723. ge = gevent.spawn(self.check_video_counts_new_with_level, video_ids=filtered_result, flow_pool_mapping=video_mapping)
  724. ge.join()
  725. check_result = ge.get()
  726. # add_flow_pool_recall_log
  727. flow_pool_recall_process['check_counts_data'] = check_result
  728. # log_.info({
  729. # 'logTimestamp': int(time.time() * 1000),
  730. # 'request_id': self.request_id,
  731. # 'app_type': self.app_type,
  732. # 'mid': self.mid,
  733. # 'uid': self.uid,
  734. # 'operation': 'check_video_counts',
  735. # 'executeTime': (time.time() - st_check) * 1000
  736. # })
  737. for item in check_result:
  738. video_id = int(item[0])
  739. flow_pool = item[1]
  740. if video_id not in flow_pool_recall_videos:
  741. # 取其中一个 flow_pool 作为召回结果
  742. # 添加视频源参数 pushFrom, abCode
  743. flow_pool_recall_result.append(
  744. {'videoId': video_id, 'flowPool': flow_pool, 'level': level,
  745. 'rovScore': random.uniform(0, 100), 'pushFrom': config_.PUSH_FROM['flow_recall'],
  746. 'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
  747. )
  748. flow_pool_recall_videos.append(video_id)
  749. # et_check = time.time()
  750. # log_.info('check result: result = {}, execute time = {}ms'.format(
  751. # check_result, (et_check - st_check) * 1000))
  752. # # 判断错误标记, True为错误
  753. # if error_flag:
  754. # # 结束流量池召回
  755. # break
  756. idx += get_size
  757. # log_.info({
  758. # 'logTimestamp': int(time.time() * 1000),
  759. # 'request_id': self.request_id,
  760. # 'operation': 'flow_pool_recall',
  761. # 'executeTime': (time.time() - start_time) * 1000
  762. # })
  763. return flow_pool_recall_result[:size], flow_pool_recall_process
  764. def flow_pool_recall_new_with_level_score(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
  765. """从流量池中获取视频"""
  766. # add_flow_pool_recall_log
  767. flow_pool_recall_process = {}
  768. # 获取存在城市分组数据的城市编码列表
  769. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  770. # 获取provinceCode
  771. province_code = self.client_info.get('provinceCode', '-1')
  772. # 获取cityCode
  773. city_code = self.client_info.get('cityCode', '-1')
  774. if city_code in city_code_list:
  775. # 分城市数据存在时,获取城市分组数据
  776. region_code = city_code
  777. else:
  778. region_code = province_code
  779. if region_code == '':
  780. region_code = '-1'
  781. flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
  782. # add_flow_pool_recall_log
  783. flow_pool_recall_process['flow_pool_key'] = flow_pool_key
  784. flow_pool_recall_process['level'] = level
  785. if flow_pool_key is None:
  786. return [], flow_pool_recall_process
  787. # print(flow_pool_key)
  788. flow_pool_recall_result = []
  789. flow_pool_recall_videos = []
  790. # 每次获取的视频数
  791. get_size = size * 5
  792. # 记录获取频次
  793. freq = 0
  794. idx = 0
  795. while len(flow_pool_recall_result) < size:
  796. freq += 1
  797. if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
  798. break
  799. # 获取数据
  800. # st_get = time.time()
  801. data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
  802. start=idx, end=idx + get_size - 1,
  803. with_scores=True)
  804. # add_flow_pool_recall_log
  805. # print(data)
  806. flow_pool_recall_process['initial_data'] = data
  807. # et_get = time.time()
  808. # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  809. # freq, data, (et_get - st_get) * 1000))
  810. if not data:
  811. # log_.info('流量池中的视频已取完')
  812. break
  813. # 将video_id 与 flow_pool, score做mapping整理
  814. video_ids = []
  815. video_mapping = {}
  816. video_score = {}
  817. for value in data:
  818. try:
  819. video_id, flow_pool = value[0].split('-')
  820. except Exception as e:
  821. log_.error({
  822. 'request_id': self.request_id,
  823. 'app_type': self.app_type,
  824. 'flow_pool_value': value
  825. })
  826. continue
  827. video_id = int(video_id)
  828. video_score[value[0]] = value[1]
  829. if video_id not in video_ids:
  830. video_ids.append(video_id)
  831. if video_id not in video_mapping:
  832. video_mapping[video_id] = [flow_pool]
  833. else:
  834. video_mapping[video_id].append(flow_pool)
  835. # 过滤
  836. filter_ = FilterVideos(request_id=self.request_id,
  837. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  838. ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
  839. region_code=region_code, shield_config=self.shield_config)
  840. ge.join()
  841. filtered_result = ge.get()
  842. # add_flow_pool_recall_log
  843. flow_pool_recall_process['filtered_data'] = filtered_result
  844. # 检查可分发数
  845. if filtered_result:
  846. # st_check = time.time()
  847. ge = gevent.spawn(self.check_video_counts_new_with_level_score,
  848. video_ids=filtered_result, flow_pool_mapping=video_mapping)
  849. ge.join()
  850. check_result = ge.get()
  851. # add_flow_pool_recall_log
  852. flow_pool_recall_process['check_counts_data'] = check_result
  853. # log_.info({
  854. # 'logTimestamp': int(time.time() * 1000),
  855. # 'request_id': self.request_id,
  856. # 'app_type': self.app_type,
  857. # 'mid': self.mid,
  858. # 'uid': self.uid,
  859. # 'operation': 'check_video_counts',
  860. # 'executeTime': (time.time() - st_check) * 1000
  861. # })
  862. for item in check_result:
  863. video_id = int(item[0])
  864. flow_pool = item[1]
  865. if video_id not in flow_pool_recall_videos:
  866. # 取其中一个 flow_pool 作为召回结果
  867. # 添加视频源参数 pushFrom, abCode
  868. flow_pool_recall_result.append(
  869. {'videoId': video_id, 'flowPool': flow_pool, 'level': level,
  870. 'rovScore': video_score[f"{video_id}-{flow_pool}"], 'pushFrom': config_.PUSH_FROM['flow_recall'],
  871. 'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
  872. )
  873. flow_pool_recall_videos.append(video_id)
  874. # et_check = time.time()
  875. # log_.info('check result: result = {}, execute time = {}ms'.format(
  876. # check_result, (et_check - st_check) * 1000))
  877. # # 判断错误标记, True为错误
  878. # if error_flag:
  879. # # 结束流量池召回
  880. # break
  881. idx += get_size
  882. # log_.info({
  883. # 'logTimestamp': int(time.time() * 1000),
  884. # 'request_id': self.request_id,
  885. # 'operation': 'flow_pool_recall',
  886. # 'executeTime': (time.time() - start_time) * 1000
  887. # })
  888. return flow_pool_recall_result[:size], flow_pool_recall_process
  889. def flow_pool_recall_new_with_level_score2(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
  890. """从流量池中获取视频"""
  891. # add_flow_pool_recall_log
  892. flow_pool_recall_process = {}
  893. # 获取存在城市分组数据的城市编码列表
  894. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  895. # 获取provinceCode
  896. province_code = self.client_info.get('provinceCode', '-1')
  897. # 获取cityCode
  898. city_code = self.client_info.get('cityCode', '-1')
  899. if city_code in city_code_list:
  900. # 分城市数据存在时,获取城市分组数据
  901. region_code = city_code
  902. else:
  903. region_code = province_code
  904. if region_code == '':
  905. region_code = '-1'
  906. flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
  907. # add_flow_pool_recall_log
  908. flow_pool_recall_process['flow_pool_key'] = flow_pool_key
  909. flow_pool_recall_process['level'] = level
  910. if flow_pool_key is None:
  911. return [], flow_pool_recall_process
  912. # print(flow_pool_key)
  913. flow_pool_recall_result = []
  914. flow_pool_recall_videos = []
  915. # 每次获取的视频数
  916. get_size = size * 5
  917. # 获取数据
  918. idx = 0
  919. data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
  920. start=idx, end=idx + get_size * 5 - 1,
  921. with_scores=True)
  922. flow_pool_recall_process['initial_data'] = data
  923. if not data:
  924. return [], flow_pool_recall_process
  925. # 将video_id 与 flow_pool, score做mapping整理
  926. video_ids = []
  927. video_mapping = {}
  928. video_score = {}
  929. for value in data:
  930. try:
  931. video_id, flow_pool = value[0].split('-')
  932. except Exception as e:
  933. log_.error({
  934. 'request_id': self.request_id,
  935. 'app_type': self.app_type,
  936. 'flow_pool_value': value
  937. })
  938. continue
  939. video_id = int(video_id)
  940. video_score[value[0]] = value[1]
  941. if video_id not in video_ids:
  942. video_ids.append(video_id)
  943. if video_id not in video_mapping:
  944. video_mapping[video_id] = [flow_pool]
  945. else:
  946. video_mapping[video_id].append(flow_pool)
  947. # 检查可分发数
  948. ge = gevent.spawn(self.check_video_counts_new_with_level_score,
  949. video_ids=video_ids, flow_pool_mapping=video_mapping)
  950. ge.join()
  951. check_result = ge.get()
  952. # add_flow_pool_recall_log
  953. flow_pool_recall_process['check_counts_data'] = check_result
  954. check_result_mapping = {}
  955. check_result_items = []
  956. if check_result:
  957. # 获取score top20 视频进入过滤
  958. for item in check_result:
  959. video_id = int(item[0])
  960. flow_pool = item[1]
  961. score = video_score[f"{video_id}-{flow_pool}"]
  962. if video_id not in flow_pool_recall_videos:
  963. check_result_mapping[video_id] = [flow_pool, score]
  964. check_result_items.append([video_id, flow_pool, score])
  965. check_result_items = sorted(check_result_items, key=lambda x: x[2], reverse=True)
  966. to_filter_videos = [item[0] for item in check_result_items[:get_size]]
  967. # 过滤
  968. filter_ = FilterVideos(request_id=self.request_id,
  969. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos)
  970. ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
  971. region_code=region_code, shield_config=self.shield_config)
  972. ge.join()
  973. filtered_result = ge.get()
  974. # add_flow_pool_recall_log
  975. flow_pool_recall_process['filtered_data'] = filtered_result
  976. for item in filtered_result:
  977. video_id = int(item)
  978. # 添加视频源参数 pushFrom, abCode
  979. flow_pool_recall_result.append(
  980. {'videoId': video_id, 'flowPool': check_result_mapping[video_id][0], 'level': level,
  981. 'rovScore': check_result_mapping[video_id][1],
  982. 'pushFrom': config_.PUSH_FROM['flow_recall'],
  983. 'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
  984. )
  985. return flow_pool_recall_result[:size], flow_pool_recall_process
  986. def check_video_counts(self, video_ids, flow_pool_mapping):
  987. """
  988. 检查视频剩余可分发数
  989. :param video_ids: 视频id type-list
  990. :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
  991. :return: check_result, error_flag
  992. """
  993. # flow_pool_key = self.get_pool_redis_key('flow')
  994. # videos = []
  995. check_result = []
  996. for video_id in video_ids:
  997. video_id = int(video_id)
  998. for flow_pool in flow_pool_mapping.get(video_id, []):
  999. # 判断是否有本地分发记录
  1000. cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
  1001. # 无记录
  1002. if cur_count is None:
  1003. # videos.append({'videoId': video_id, 'flowPool': flow_pool})
  1004. continue
  1005. # 本地分发数 cur_count > 0
  1006. elif cur_count > 0:
  1007. check_result.append((video_id, flow_pool))
  1008. # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
  1009. else:
  1010. add_remove_log = False
  1011. remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
  1012. self.redis_helper.del_keys(remain_count_key)
  1013. value = '{}-{}'.format(video_id, flow_pool)
  1014. for item in config_.APP_TYPE:
  1015. flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}"
  1016. remove_res = self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
  1017. if remove_res > 0:
  1018. add_remove_log = True
  1019. quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}" \
  1020. f":{config_.QUICK_FLOW_POOL_ID}"
  1021. remove_res = self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=value)
  1022. if remove_res > 0:
  1023. add_remove_log = True
  1024. if add_remove_log is True:
  1025. log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
  1026. return check_result
  1027. def check_video_counts_new(self, video_ids, flow_pool_mapping):
  1028. """
  1029. 检查视频剩余可分发数
  1030. :param video_ids: 视频id type-list
  1031. :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
  1032. :return: check_result, error_flag
  1033. """
  1034. # flow_pool_key = self.get_pool_redis_key('flow')
  1035. # videos = []
  1036. check_result = []
  1037. for video_id in video_ids:
  1038. video_id = int(video_id)
  1039. for flow_pool in flow_pool_mapping.get(video_id, []):
  1040. # 判断是否有本地分发记录
  1041. cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
  1042. # 无记录
  1043. if cur_count is None:
  1044. # videos.append({'videoId': video_id, 'flowPool': flow_pool})
  1045. continue
  1046. # 本地分发数 cur_count > 0
  1047. elif cur_count > 0:
  1048. check_result.append((video_id, flow_pool))
  1049. # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
  1050. else:
  1051. add_remove_log = False
  1052. remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
  1053. self.redis_helper.del_keys(remain_count_key)
  1054. value = '{}-{}'.format(video_id, flow_pool)
  1055. for item in config_.APP_TYPE:
  1056. flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}"
  1057. remove_res = self.redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
  1058. if remove_res > 0:
  1059. add_remove_log = True
  1060. quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}" \
  1061. f":{config_.QUICK_FLOW_POOL_ID}"
  1062. remove_res = self.redis_helper.remove_value_from_set(key_name=quick_flow_pool_key,
  1063. values=(value, ))
  1064. if remove_res > 0:
  1065. add_remove_log = True
  1066. if add_remove_log is True:
  1067. log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
  1068. return check_result
  1069. def check_video_counts_new_with_level(self, video_ids, flow_pool_mapping):
  1070. """
  1071. 检查视频剩余可分发数
  1072. :param video_ids: 视频id type-list
  1073. :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
  1074. :return: check_result, error_flag
  1075. """
  1076. # flow_pool_key = self.get_pool_redis_key('flow')
  1077. # videos = []
  1078. # level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
  1079. # level_list = [level for level in json.loads(level_weight)]
  1080. if self.level_weight is None:
  1081. level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
  1082. else:
  1083. level_weight = self.level_weight
  1084. level_list = [level for level in level_weight]
  1085. check_result = []
  1086. for video_id in video_ids:
  1087. video_id = int(video_id)
  1088. for flow_pool in flow_pool_mapping.get(video_id, []):
  1089. # 判断是否有本地分发记录
  1090. cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
  1091. # 无记录
  1092. if cur_count is None:
  1093. # videos.append({'videoId': video_id, 'flowPool': flow_pool})
  1094. continue
  1095. # 本地分发数 cur_count > 0
  1096. elif cur_count > 0:
  1097. check_result.append((video_id, flow_pool))
  1098. # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
  1099. else:
  1100. add_remove_log = False
  1101. remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
  1102. self.redis_helper.del_keys(remain_count_key)
  1103. value = '{}-{}'.format(video_id, flow_pool)
  1104. for item in config_.APP_TYPE:
  1105. for level in level_list:
  1106. flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{config_.APP_TYPE.get(item)}:{level}"
  1107. remove_res = self.redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
  1108. if remove_res > 0:
  1109. add_remove_log = True
  1110. quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}" \
  1111. f":{config_.QUICK_FLOW_POOL_ID}"
  1112. remove_res = self.redis_helper.remove_value_from_set(key_name=quick_flow_pool_key,
  1113. values=(value, ))
  1114. if remove_res > 0:
  1115. add_remove_log = True
  1116. if add_remove_log is True:
  1117. log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
  1118. return check_result
  1119. def check_video_counts_new_with_level_score(self, video_ids, flow_pool_mapping):
  1120. """
  1121. 检查视频剩余可分发数
  1122. :param video_ids: 视频id type-list
  1123. :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
  1124. :return: check_result, error_flag
  1125. """
  1126. # flow_pool_key = self.get_pool_redis_key('flow')
  1127. # videos = []
  1128. # level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
  1129. # level_list = [level for level in json.loads(level_weight)]
  1130. if self.level_weight is None:
  1131. level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
  1132. else:
  1133. level_weight = self.level_weight
  1134. level_list = [level for level in level_weight]
  1135. check_result = []
  1136. for video_id in video_ids:
  1137. video_id = int(video_id)
  1138. for flow_pool in flow_pool_mapping.get(video_id, []):
  1139. # 判断是否有本地分发记录
  1140. cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
  1141. # 无记录
  1142. if cur_count is None:
  1143. # videos.append({'videoId': video_id, 'flowPool': flow_pool})
  1144. continue
  1145. # 本地分发数 cur_count > 0
  1146. elif cur_count > 0:
  1147. check_result.append((video_id, flow_pool))
  1148. # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
  1149. else:
  1150. add_remove_log = False
  1151. remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
  1152. self.redis_helper.del_keys(remain_count_key)
  1153. value = '{}-{}'.format(video_id, flow_pool)
  1154. for item in config_.APP_TYPE:
  1155. for level in level_list:
  1156. flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{config_.APP_TYPE.get(item)}:{level}"
  1157. remove_res = self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
  1158. if remove_res > 0:
  1159. add_remove_log = True
  1160. quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}" \
  1161. f":{config_.QUICK_FLOW_POOL_ID}"
  1162. remove_res = self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=value)
  1163. if remove_res > 0:
  1164. add_remove_log = True
  1165. if add_remove_log is True:
  1166. log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
  1167. return check_result
  1168. """
  1169. # 本次视频都有本地记录
  1170. if len(videos) == 0:
  1171. error_flag = False
  1172. return check_result, error_flag
  1173. # 本地无记录视频,检查实时分发数
  1174. st_remain_view_count = time.time()
  1175. view_count_result, error_flag = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
  1176. log_.info({
  1177. 'logTimestamp': int(time.time() * 1000),
  1178. 'request_id': self.request_id,
  1179. 'app_type': self.app_type,
  1180. 'mid': self.mid,
  1181. 'uid': self.uid,
  1182. 'operation': 'remainViewCount',
  1183. 'executeTime': (time.time() - st_remain_view_count) * 1000
  1184. })
  1185. # 判断返回的错误标记,True为错误
  1186. if error_flag:
  1187. return check_result, error_flag
  1188. # 从流量召回池移除视频videos
  1189. # for item in videos:
  1190. # value = '{}-{}'.format(item['videoId'], item['flowPool'])
  1191. # self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
  1192. # redis_helper = RedisHelper()
  1193. for item in view_count_result:
  1194. try:
  1195. # 接口超时,item[2]可能为None
  1196. remain_count = int(item[2])
  1197. except Exception as e:
  1198. # log_.error('remain_count type error...')
  1199. log_.error(traceback.format_exc())
  1200. continue
  1201. if remain_count > 0:
  1202. # viewCount > 0
  1203. check_result.append(item)
  1204. # 将分发数更新到本地记录
  1205. key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{item[0]}:{item[1]}"
  1206. self.redis_helper.setnx_key(key_name=key_name, value=remain_count, expire_time=5 * 60)
  1207. else:
  1208. # viewCount <= 0
  1209. # 从流量召回池移除
  1210. value = '{}-{}'.format(item[0], item[1])
  1211. for item in config_.APP_TYPE:
  1212. flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}"
  1213. self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
  1214. return check_result, error_flag
  1215. """
  1216. def get_pool_redis_key(self, pool_type, flow_pool_id=None):
  1217. """
  1218. 拼接key
  1219. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  1220. :param flow_pool_id: 流量池ID
  1221. :return: key_name
  1222. """
  1223. if pool_type == 'rov':
  1224. # appType = 13 票圈视频app
  1225. if self.app_type == config_.APP_TYPE['APP']:
  1226. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP
  1227. # abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
  1228. # elif self.ab_code == config_.AB_CODE['rov_rank_appType_18_19']:
  1229. # key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{self.app_type}."
  1230. # 其他
  1231. else:
  1232. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX
  1233. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  1234. key_name = key_name_prefix + time.strftime('%Y%m%d')
  1235. if self.redis_helper.key_exists(key_name):
  1236. redis_date = date.today().strftime('%Y%m%d')
  1237. else:
  1238. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1239. key_name = key_name_prefix + redis_date
  1240. # if not self.redis_helper.key_exists(key_name):
  1241. # return None, None
  1242. # 判断当前时间是否晚于rov召回池更新时间 + 1h,发送消息到飞书
  1243. # now_h = datetime.now().hour
  1244. # now_m = datetime.now().minute
  1245. # feishu_text = '{} —— 今日ROV召回池数据未按时更新,请及时查看解决。'.format(config_.ENV_TEXT)
  1246. # if now_h == config_.ROV_UPDATE_H + 1 and now_m > config_.ROV_UPDATE_MINUTE:
  1247. # send_msg_to_feishu(feishu_text)
  1248. # elif now_h > config_.ROV_UPDATE_H + 2:
  1249. # send_msg_to_feishu(feishu_text)
  1250. return key_name, redis_date
  1251. elif pool_type == 'flow':
  1252. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  1253. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
  1254. else:
  1255. return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
  1256. elif pool_type == 'flow_set':
  1257. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  1258. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
  1259. else:
  1260. return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}"
  1261. elif pool_type == 'flow_set_level':
  1262. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  1263. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}", None
  1264. else:
  1265. # 1. 获取流量池各层级分发概率权重
  1266. # level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
  1267. # if level_weight is None:
  1268. # level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
  1269. # else:
  1270. # level_weight = json.loads(level_weight)
  1271. if self.level_weight is None:
  1272. level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
  1273. else:
  1274. level_weight = self.level_weight
  1275. # print(level_weight)
  1276. # 2. 判断各层级是否有视频需分发
  1277. available_level = []
  1278. for level, weight in level_weight.items():
  1279. level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{self.app_type}:{level}"
  1280. if self.redis_helper.key_exists(key_name=level_key):
  1281. available_level.append((level, level_key, weight))
  1282. if len(available_level) == 0:
  1283. return None, None
  1284. # 3. 根据可分发层级权重设置分发概率
  1285. available_level = sorted(available_level, key=lambda x: x[2], reverse=False)
  1286. weight_sum = sum([int(item[2]) for item in available_level])
  1287. level_p_mapping = {}
  1288. level_p_low = 0
  1289. weight_temp = 0
  1290. for item in available_level:
  1291. level, level_key, weight = item[0], item[1], item[2]
  1292. level_p_up = (weight_temp + weight)/weight_sum
  1293. level_p_mapping[level] = {
  1294. 'key': level_key,
  1295. 'level_p': [round(level_p_low, 2), round(level_p_up, 2)]
  1296. }
  1297. level_p_low = round(level_p_up, 2)
  1298. weight_temp += weight
  1299. # log_.info(f"level_p_mapping: {level_p_mapping}")
  1300. # 4. 随机生成[0,1)之间数,返回相应概率区间的key
  1301. random_p = random.random()
  1302. for level, level_info in level_p_mapping.items():
  1303. level_p = level_info['level_p']
  1304. if level_p[0] <= random_p < level_p[1]:
  1305. # log_.info(f"random_p: {random_p}, level_p: {level_p}, level: {level}")
  1306. return level_info['key'], level
  1307. else:
  1308. continue
  1309. return None, None
  1310. elif pool_type == 'flow_set_level_score':
  1311. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  1312. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}", None
  1313. else:
  1314. # 1. 获取流量池各层级分发概率权重
  1315. if self.level_weight is None:
  1316. level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
  1317. else:
  1318. level_weight = self.level_weight
  1319. # print(level_weight)
  1320. # 2. 判断各层级是否有视频需分发
  1321. available_level = []
  1322. for level, weight in level_weight.items():
  1323. level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{self.app_type}:{level}"
  1324. if self.redis_helper.key_exists(key_name=level_key):
  1325. available_level.append((level, level_key, weight))
  1326. if len(available_level) == 0:
  1327. return None, None
  1328. # 3. 根据可分发层级权重设置分发概率
  1329. available_level = sorted(available_level, key=lambda x: x[2], reverse=False)
  1330. weight_sum = sum([int(item[2]) for item in available_level])
  1331. level_p_mapping = {}
  1332. level_p_low = 0
  1333. weight_temp = 0
  1334. for item in available_level:
  1335. level, level_key, weight = item[0], item[1], item[2]
  1336. level_p_up = (weight_temp + weight)/weight_sum
  1337. level_p_mapping[level] = {
  1338. 'key': level_key,
  1339. 'level_p': [round(level_p_low, 2), round(level_p_up, 2)]
  1340. }
  1341. level_p_low = round(level_p_up, 2)
  1342. weight_temp += weight
  1343. # log_.info(f"level_p_mapping: {level_p_mapping}")
  1344. # 4. 随机生成[0,1)之间数,返回相应概率区间的key
  1345. random_p = random.random()
  1346. for level, level_info in level_p_mapping.items():
  1347. level_p = level_info['level_p']
  1348. if level_p[0] <= random_p < level_p[1]:
  1349. # log_.info(f"random_p: {random_p}, level_p: {level_p}, level: {level}")
  1350. return level_info['key'], level
  1351. else:
  1352. continue
  1353. return None, None
  1354. elif pool_type == 'special':
  1355. key_name_prefix = config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS
  1356. # 判断列表是否更新,未更新则使用前一天的列表
  1357. key_name = f"{key_name_prefix}{time.strftime('%Y%m%d')}"
  1358. if self.redis_helper.key_exists(key_name):
  1359. redis_date = date.today().strftime('%Y%m%d')
  1360. else:
  1361. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1362. key_name = f"{key_name_prefix}{redis_date}"
  1363. # 判断当前时间是否晚于更新时间,发送消息到飞书
  1364. # now_h = datetime.now().hour
  1365. # feishu_text = '{} —— 今日special mid 数据未按时更新,请及时查看解决。'.format(config_.ENV_TEXT)
  1366. # if now_h > config_.ROV_UPDATE_H:
  1367. # send_msg_to_feishu(feishu_text)
  1368. return key_name, redis_date
  1369. else:
  1370. log_.error('pool type error')
  1371. return None, None
  1372. def get_video_last_idx(self, video_type=''):
  1373. """获取用户上一次在rov召回池对应的位置"""
  1374. # if self.ab_code in [config_.AB_CODE['rank_by_h']] or self.app_type == config_.APP_TYPE['APP']:
  1375. # abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
  1376. if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()] + \
  1377. [code for _, code in config_.AB_CODE['region_rank_by_h'].items()] + \
  1378. [config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'],
  1379. config_.AB_CODE['top_video_relevant_appType_19']] + \
  1380. [code for _, code in config_.AB_CODE['rank_by_24h'].items()] or \
  1381. video_type == 'whole_movies':
  1382. rov_pool_key, redis_date = self.get_pool_redis_key_with_h('rov', video_type=video_type)
  1383. elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
  1384. rov_pool_key, redis_date = self.get_pool_redis_key_with_day('dup')
  1385. else:
  1386. rov_pool_key, redis_date = self.get_pool_redis_key('rov')
  1387. if not rov_pool_key:
  1388. return None, None, None
  1389. if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
  1390. now_h = datetime.now().hour
  1391. if now_h < 7:
  1392. last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX
  1393. else:
  1394. last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX
  1395. elif video_type == 'whole_movies':
  1396. last_key_prefix = config_.LAST_VIDEO_FROM_WHOLE_MOVIES_PREFIX
  1397. else:
  1398. last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX
  1399. last_rov_recall_key = f'{last_key_prefix}{self.app_type}:{self.mid}:{redis_date}'
  1400. value = self.redis_helper.get_data_from_redis(last_rov_recall_key)
  1401. if value:
  1402. idx = self.redis_helper.get_index_with_data(rov_pool_key, value)
  1403. if not idx:
  1404. idx = 0
  1405. else:
  1406. idx += 1
  1407. else:
  1408. idx = 0
  1409. return rov_pool_key, last_rov_recall_key, idx
  1410. def get_update_rov_videos(self):
  1411. """
  1412. 获取修改ROV的视频
  1413. :return: update_rov_video_ids, update_rov_result
  1414. """
  1415. try:
  1416. # 获取修改过ROV的视频
  1417. if self.app_type == config_.APP_TYPE['APP']:
  1418. key_name = config_.UPDATE_ROV_KEY_NAME_APP
  1419. else:
  1420. key_name = config_.UPDATE_ROV_KEY_NAME
  1421. # redis_helper = RedisHelper()
  1422. data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
  1423. start=0, end=-1, with_scores=True)
  1424. # 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
  1425. if data is None:
  1426. return [], []
  1427. video_ids = []
  1428. video_score = {}
  1429. for value in data:
  1430. video_id = int(value[0])
  1431. video_ids.append(video_id)
  1432. video_score[video_id] = value[1]
  1433. # 过滤
  1434. filter_ = FilterVideos(request_id=self.request_id,
  1435. app_type=self.app_type, video_ids=video_ids, mid=self.mid, uid=self.uid)
  1436. ge = gevent.spawn(filter_.filter_videos)
  1437. ge.join()
  1438. filtered_result = ge.get()
  1439. # 添加视频源参数 pushFrom, abCode
  1440. update_rov_video_ids, update_rov_result = [], []
  1441. if not filtered_result:
  1442. return update_rov_video_ids, update_rov_result
  1443. for item in filtered_result:
  1444. video_id = int(item)
  1445. rov_score = video_score.get(video_id)
  1446. if rov_score is None:
  1447. continue
  1448. update_rov_video_ids.append(video_id)
  1449. update_rov_result.append({'videoId': video_id, 'rovScore': rov_score,
  1450. 'pushFrom': config_.PUSH_FROM['rov_recall'], 'abCode': self.ab_code})
  1451. return update_rov_video_ids, update_rov_result
  1452. except Exception as e:
  1453. log_.error(traceback.format_exc())
  1454. return [], []
  1455. def get_top_videos(self):
  1456. """
  1457. 获取置顶视频
  1458. :return: top_video_ids, top_video_result
  1459. """
  1460. try:
  1461. # 获取生效中的置顶视频列表
  1462. # redis_helper = RedisHelper()
  1463. if self.app_type == config_.APP_TYPE['APP']:
  1464. key_name = config_.TOP_VIDEO_LIST_KEY_NAME_APP
  1465. else:
  1466. key_name = config_.TOP_VIDEO_LIST_KEY_NAME
  1467. data = self.redis_helper.get_data_from_redis(key_name=key_name)
  1468. # log_.info('===1=== {}'.format(data))
  1469. if data is None:
  1470. return [], []
  1471. # 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
  1472. video_ids = []
  1473. video_info = {}
  1474. for item in eval(data):
  1475. # log_.info('=== uid: {}, item === {}'.format(self.uid, item))
  1476. video_id = int(item['videoId'])
  1477. # 判断 视频推荐区域与用户地址信息 是否匹配,市级别
  1478. city_code_list = item['cityCode'].split(',')
  1479. # log_.info('=== uid: {}, city_code_list: {},{}, cityCode: {},{} === '.format(
  1480. # self.uid, city_code_list, type(city_code_list[0]), self.client_info.get('cityCode'),
  1481. # type(self.client_info.get('cityCode'))))
  1482. if self.client_info.get('cityCode') in city_code_list or config_.ALL_AREA_CODE in city_code_list:
  1483. # log_.info('=== uid: {}, ===2=== {}, ===3=== video_id: {}, cityCode: {}'.format(
  1484. # self.uid, self.client_info.get('cityCode'), video_id, city_code_list))
  1485. if video_id in video_ids:
  1486. # 如果video_id有重复,score保留大分值
  1487. if item['score'] > video_info[video_id].get('score'):
  1488. video_info[video_id] = {'score': item['score'], 'cityCode': city_code_list}
  1489. else:
  1490. continue
  1491. else:
  1492. video_ids.append(video_id)
  1493. video_info[video_id] = {'score': item['score'], 'cityCode': city_code_list}
  1494. else:
  1495. continue
  1496. # 过滤
  1497. filter_ = FilterVideos(request_id=self.request_id,
  1498. app_type=self.app_type, video_ids=video_ids, mid=self.mid, uid=self.uid)
  1499. ge = gevent.spawn(filter_.filter_videos)
  1500. ge.join()
  1501. filtered_result = ge.get()
  1502. # 添加视频源参数 pushFrom = 'op_manual', abCode
  1503. top_video_ids, top_video_result = [], []
  1504. if not filtered_result:
  1505. return top_video_ids, top_video_result
  1506. for item in filtered_result:
  1507. video_id = int(item)
  1508. item_info = video_info.get(video_id)
  1509. if item_info is None:
  1510. continue
  1511. top_video_ids.append(video_id)
  1512. top_video_result.append({'videoId': video_id, 'rovScore': item_info.get('score'),
  1513. 'pushFrom': config_.PUSH_FROM['top'], 'abCode': self.ab_code})
  1514. return top_video_ids, top_video_result
  1515. except Exception as e:
  1516. log_.error(traceback.format_exc())
  1517. return [], []
  1518. def get_pool_redis_key_with_h(self, pool_type, video_type=''):
  1519. """
  1520. 拼接key,获取以小时级别更新的视频列表
  1521. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  1522. :param video_type: 视频列表区分 whole_movies - 完整影视资源
  1523. :return: key_name
  1524. """
  1525. if pool_type == 'rov':
  1526. now_date = date.today().strftime('%Y%m%d')
  1527. # 获取当前所在小时
  1528. h = datetime.now().hour
  1529. # appType = 13, 票圈视频APP
  1530. # 数据更新周期:每天07:00-21:00, 2h/次
  1531. # if self.app_type == config_.APP_TYPE['APP']:
  1532. # if h < 7:
  1533. # key_h = 21
  1534. # key_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1535. # elif h > 21:
  1536. # key_h = 21
  1537. # key_date = now_date
  1538. # else:
  1539. # if h % 2 == 0:
  1540. # key_h = h - 1
  1541. # key_date = now_date
  1542. # else:
  1543. # key_h = h
  1544. # key_date = now_date
  1545. # # print(key_date, key_h)
  1546. # key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{key_date}.{key_h}'
  1547. # if self.redis_helper.key_exists(key_name):
  1548. # return key_name, key_h
  1549. # else:
  1550. # if key_h == 7:
  1551. # redis_h = 21
  1552. # redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1553. # else:
  1554. # redis_h = key_h - 2
  1555. # redis_date = key_date
  1556. # # print(redis_date, redis_h)
  1557. # key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}'
  1558. # return key_name, redis_h
  1559. # elif self.app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
  1560. # abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
  1561. if self.ab_code in [
  1562. config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'],
  1563. config_.AB_CODE['top_video_relevant_appType_19']
  1564. ]:
  1565. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  1566. key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{self.app_type}:"
  1567. key_name = f"{key_name_prefix}{now_date}:{h}"
  1568. if self.redis_helper.key_exists(key_name):
  1569. return key_name, h
  1570. else:
  1571. if h == 0:
  1572. redis_h = 23
  1573. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1574. else:
  1575. redis_h = h - 1
  1576. redis_date = now_date
  1577. key_name = f"{key_name_prefix}{redis_date}:{redis_h}"
  1578. # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
  1579. # now_m = datetime.now().minute
  1580. # feishu_text = '{} —— appType = {}, h = {} 数据未按时更新,请及时查看解决。'.format(
  1581. # config_.ENV_TEXT, self.app_type, h)
  1582. # if now_m > config_.ROV_H_UPDATE_MINUTE:
  1583. # send_msg_to_feishu(feishu_text)
  1584. return key_name, redis_h
  1585. # 完整影视资源
  1586. elif video_type == 'whole_movies':
  1587. # 判断完整影视资源列表是否更新,未更新则使用前一小时的热度列表
  1588. key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}"
  1589. key_name = f"{key_name_prefix}{now_date}.{h}"
  1590. if self.redis_helper.key_exists(key_name):
  1591. return key_name, h
  1592. else:
  1593. if h == 0:
  1594. redis_h = 23
  1595. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1596. else:
  1597. redis_h = h - 1
  1598. redis_date = now_date
  1599. key_name = f"{key_name_prefix}{redis_date}.{redis_h}"
  1600. # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
  1601. # now_m = datetime.now().minute
  1602. # feishu_text = '{} —— appType = {}, h = {} 完整影视资源数据未按时更新,请及时查看解决。'.format(
  1603. # config_.ENV_TEXT, self.app_type, h)
  1604. # if now_m > config_.ROV_H_UPDATE_MINUTE:
  1605. # send_msg_to_feishu(feishu_text)
  1606. return key_name, redis_h
  1607. else:
  1608. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  1609. if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
  1610. # 获取存在城市分组数据的城市编码列表
  1611. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  1612. # 获取provinceCode
  1613. province_code = self.client_info.get('provinceCode', '-1')
  1614. # 获取cityCode
  1615. city_code = self.client_info.get('cityCode', '-1')
  1616. if city_code in city_code_list:
  1617. # 分城市数据存在时,获取城市分组数据
  1618. region_code = city_code
  1619. else:
  1620. region_code = province_code
  1621. if region_code == '':
  1622. region_code = '-1'
  1623. if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2'):
  1624. key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H}{region_code}."
  1625. else:
  1626. key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region_code}:"
  1627. elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
  1628. key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_24H
  1629. else:
  1630. key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_H
  1631. key_name = f"{key_prefix}{self.data_key}:{self.rule_key}:{now_date}:{h}"
  1632. if self.redis_helper.key_exists(key_name):
  1633. return key_name, h
  1634. else:
  1635. if h == 0:
  1636. redis_h = 23
  1637. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1638. else:
  1639. redis_h = h - 1
  1640. redis_date = now_date
  1641. key_name = f"{key_prefix}{self.data_key}:{self.rule_key}:{redis_date}:{redis_h}"
  1642. # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
  1643. # now_m = datetime.now().minute
  1644. # feishu_text = '{} —— appType = {}, h = {}, key = {}, province_code = {} 数据未按时更新,请及时查看解决。'.format(
  1645. # config_.ENV_TEXT, self.app_type, h, key_name, self.client_info.get('provinceCode', '-1'))
  1646. # if now_m > config_.ROV_H_UPDATE_MINUTE:
  1647. # send_msg_to_feishu(feishu_text)
  1648. return key_name, redis_h
  1649. elif pool_type == 'flow':
  1650. return config_.FLOW_POOL_KEY_NAME_PREFIX + str(self.app_type)
  1651. else:
  1652. # log_.error('pool type error')
  1653. return None, None
  1654. def flow_pool_recall_18_19(self, size=4, push_from=config_.PUSH_FROM['flow_recall']):
  1655. """从流量池中获取视频"""
  1656. start_time = time.time()
  1657. flow_pool_key = self.get_pool_redis_key('flow')
  1658. flow_pool_recall_result = []
  1659. flow_pool_recall_videos = []
  1660. # 每次获取的视频数
  1661. get_size = size * 5
  1662. # 记录获取频次
  1663. freq = 0
  1664. idx = 0
  1665. while len(flow_pool_recall_result) < size:
  1666. freq += 1
  1667. if freq > config_.MAX_FREQ_FROM_FLOW_POOL_18_19:
  1668. break
  1669. # 获取数据
  1670. data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
  1671. start=idx, end=idx + get_size - 1,
  1672. with_scores=True)
  1673. # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  1674. # freq, data, (et_get - st_get) * 1000))
  1675. if not data:
  1676. # log_.info('流量池中的视频已取完')
  1677. break
  1678. # 将video_id 与 score做mapping整理
  1679. video_ids = []
  1680. video_score = {}
  1681. for value in data:
  1682. video_id = int(value[0])
  1683. video_ids.append(video_id)
  1684. video_score[video_id] = value[1]
  1685. # 过滤
  1686. filter_ = FilterVideos(request_id=self.request_id,
  1687. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  1688. ge = gevent.spawn(filter_.filter_videos)
  1689. ge.join()
  1690. filtered_result = ge.get()
  1691. # 添加视频源参数 pushFrom, abCode
  1692. if filtered_result:
  1693. # 添加视频源参数 pushFrom, abCode
  1694. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  1695. 'pushFrom': push_from, 'abCode': self.ab_code}
  1696. for item in filtered_result if video_score.get(int(item)) is not None]
  1697. flow_pool_recall_result.extend(temp_result)
  1698. idx += get_size
  1699. log_.info({
  1700. 'logTimestamp': int(time.time() * 1000),
  1701. 'request_id': self.request_id,
  1702. 'operation': 'flow_pool_recall_18_19',
  1703. 'executeTime': (time.time() - start_time) * 1000
  1704. })
  1705. return flow_pool_recall_result[:size]
  1706. def get_pool_redis_key_with_day(self, pool_type):
  1707. """
  1708. 拼接key,获取以天级别规则更新的视频列表
  1709. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  1710. :return: key_name
  1711. """
  1712. now_date = datetime.today()
  1713. now_dt = datetime.strftime(now_date, '%Y%m%d')
  1714. if pool_type == 'rov':
  1715. # 判断列表是否更新,未更新则使用前一天的列表
  1716. rule_key_name = f'{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{self.rule_key}.{now_dt}'
  1717. if self.redis_helper.key_exists(key_name=rule_key_name):
  1718. return rule_key_name, now_dt
  1719. else:
  1720. redis_dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
  1721. rule_key_name = f'{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{self.rule_key}.{redis_dt}'
  1722. # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
  1723. # now_h = datetime.now().hour
  1724. # feishu_text = f'{config_.ENV_TEXT} —— appType = {self.app_type}, date = {now_dt}, ' \
  1725. # f'rule_key = {self.rule_key} 数据未按时更新,请及时查看解决。'
  1726. # if now_h > config_.ROV_DAY_UPDATE_MINUTE:
  1727. # send_msg_to_feishu(feishu_text)
  1728. return rule_key_name, redis_dt
  1729. elif pool_type == 'dup':
  1730. now_h = datetime.now().hour
  1731. if now_h < 7:
  1732. dup_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
  1733. else:
  1734. dup_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
  1735. dup_key_name = f'{dup_key_name_prefix}{self.rule_key}.{now_dt}'
  1736. if self.redis_helper.key_exists(key_name=dup_key_name):
  1737. return dup_key_name, now_dt
  1738. else:
  1739. redis_dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
  1740. dup_key_name = f'{dup_key_name_prefix}{self.rule_key}.{redis_dt}'
  1741. return dup_key_name, redis_dt
  1742. elif pool_type == 'flow':
  1743. return config_.FLOW_POOL_KEY_NAME_PREFIX + str(self.app_type)
  1744. else:
  1745. # log_.error('pool type error')
  1746. return None, None
  1747. def get_video_last_idx_day(self):
  1748. """获取用户上一次在天级规则更新列表中对应的位置"""
  1749. rule_key_name, redis_dt = self.get_pool_redis_key_with_day('rov')
  1750. if not rule_key_name:
  1751. return None, None, None
  1752. last_rule_day_recall_key = \
  1753. f'{config_.LAST_VIDEO_FROM_RULE_DAY_POOL_PREFIX}{self.app_type}.{self.mid}.{redis_dt}'
  1754. value = self.redis_helper.get_data_from_redis(last_rule_day_recall_key)
  1755. if value:
  1756. idx = self.redis_helper.get_index_with_data(rule_key_name, value)
  1757. if not idx:
  1758. idx = 0
  1759. else:
  1760. idx += 1
  1761. else:
  1762. idx = 0
  1763. return rule_key_name, last_rule_day_recall_key, idx
  1764. def old_videos_recall(self, size):
  1765. """老视频召回"""
  1766. # 获取老视频
  1767. now_dt = datetime.strftime(datetime.today(), '%Y%m%d')
  1768. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_dt}'
  1769. old_videos = self.redis_helper.get_data_from_set(key_name=key_name)
  1770. if not old_videos:
  1771. return []
  1772. # 过滤
  1773. old_video_ids = [int(video_id) for video_id in old_videos]
  1774. filter_ = FilterVideos(request_id=self.request_id,
  1775. app_type=self.app_type, video_ids=old_video_ids, mid=self.mid, uid=self.uid)
  1776. ge = gevent.spawn(filter_.filter_videos)
  1777. ge.join()
  1778. filtered_videos = ge.get()
  1779. if not filtered_videos:
  1780. return []
  1781. # 添加视频源参数 pushFrom, abCode
  1782. old_video_result = [{'videoId': int(item), 'rovScore': 0,
  1783. 'pushFrom': config_.PUSH_FROM['old_video'], 'abCode': self.ab_code}
  1784. for item in filtered_videos]
  1785. # 随机抽取 size+1 条数据
  1786. random.shuffle(old_video_result)
  1787. return old_video_result[:size+1]
  1788. def get_region_dup_video_last_idx_h(self, province_code, region_dup=None):
  1789. """获取用户上一次在 地域分组 相关去重列表中对应的位置"""
  1790. now_date = date.today().strftime('%Y%m%d')
  1791. h = datetime.now().hour
  1792. if region_dup == 1:
  1793. # 小程序地域分组天级更新结果与小程序地域分组小时级更新结果去重
  1794. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H
  1795. elif region_dup == 2:
  1796. if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2'):
  1797. # 小程序天级更新结果与 小程序地域分组小时级更新24h结果 去重
  1798. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H
  1799. else:
  1800. # 小程序天级更新结果与 小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重
  1801. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H
  1802. else:
  1803. key_name_prefix = ''
  1804. key_name = f"{key_name_prefix}{province_code}.{self.rule_key}.{now_date}.{h}"
  1805. last_region_dup_key = \
  1806. f'{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{h}'
  1807. if not self.redis_helper.key_exists(key_name=key_name):
  1808. if h == 0:
  1809. redis_h = 23
  1810. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  1811. else:
  1812. redis_h = h - 1
  1813. redis_date = now_date
  1814. key_name = f"{key_name_prefix}{province_code}.{self.rule_key}.{redis_date}.{redis_h}"
  1815. last_region_dup_key = \
  1816. f"{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{redis_h}"
  1817. # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
  1818. # now_m = datetime.now().minute
  1819. # feishu_text = f"{config_.ENV_TEXT} —— appType = {self.app_type}, h = {h}, key = {key_name}, " \
  1820. # f"province_code = {province_code} 数据未按时更新,请及时查看解决。"
  1821. # if now_m > config_.REGION_H_UPDATE_MINUTE:
  1822. # send_msg_to_feishu(feishu_text)
  1823. value = self.redis_helper.get_data_from_redis(last_region_dup_key)
  1824. if value:
  1825. idx = self.redis_helper.get_index_with_data(key_name, value)
  1826. if not idx:
  1827. idx = 0
  1828. else:
  1829. idx += 1
  1830. else:
  1831. idx = 0
  1832. return key_name, last_region_dup_key, idx
  1833. def rov_pool_recall_with_region_process(self, size=4, expire_time=24*3600):
  1834. """
  1835. 地域分组召回视频
  1836. :param size: 获取视频个数
  1837. :param expire_time: 末位视频记录redis过期时间
  1838. :return:
  1839. """
  1840. start_time = time.time()
  1841. # 获取存在城市分组数据的城市编码列表
  1842. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  1843. # 获取provinceCode
  1844. province_code = self.client_info.get('provinceCode', '-1')
  1845. # 获取cityCode
  1846. city_code = self.client_info.get('cityCode', '-1')
  1847. if city_code in city_code_list:
  1848. # 分城市数据存在时,获取城市分组数据
  1849. region_code = city_code
  1850. else:
  1851. region_code = province_code
  1852. if region_code == '':
  1853. region_code = '-1'
  1854. # if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
  1855. # if region_code == '-1':
  1856. # t = [
  1857. # gevent.spawn(self.recall_update_by_day, size, '30day'),
  1858. # gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
  1859. # gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
  1860. # ]
  1861. # else:
  1862. # t = [
  1863. # gevent.spawn(self.recall_update_by_day, size, '30day'),
  1864. # gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
  1865. # gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
  1866. # gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
  1867. # gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
  1868. # ]
  1869. # else:
  1870. if region_code == '-1':
  1871. t = [
  1872. # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
  1873. gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
  1874. gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
  1875. ]
  1876. else:
  1877. t = [
  1878. #add recall video
  1879. # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
  1880. # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
  1881. # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
  1882. gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
  1883. gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
  1884. gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
  1885. gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
  1886. #
  1887. ]
  1888. gevent.joinall(t)
  1889. region_recall_result_list = [i.get() for i in t]
  1890. # 将已获取到的视频按顺序去重合并
  1891. now_video_ids = []
  1892. recall_result = []
  1893. recall_num = size
  1894. # if ab_code and exp_config:
  1895. # if ab_code==60058 or ab_code==60059 or ab_code == 60060 \
  1896. # or ab_code == 60061 or ab_code==60052 \
  1897. # or ab_code==60053 or ab_code==60057 :
  1898. # try:
  1899. # recall_num = int(exp_config['recall_num'])
  1900. # except:
  1901. # recall_num = size
  1902. # if recall_num<size:
  1903. # recall_num = size
  1904. for region_result in region_recall_result_list:
  1905. for video in region_result:
  1906. video_id = video.get('videoId')
  1907. if video_id not in now_video_ids:
  1908. recall_result.append(video)
  1909. now_video_ids.append(video_id)
  1910. if len(recall_result) >= recall_num:
  1911. break
  1912. else:
  1913. continue
  1914. # # 130实验组不获取大列表的数据
  1915. # if self.ab_code != config_.AB_CODE['region_rank_by_h'].get('abtest_130'):
  1916. # # 判断获取到的小时级数据数量
  1917. # if len(recall_result) < size:
  1918. # # 补充数据
  1919. # rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  1920. # # 去重合并
  1921. # for video in rov_recall_result:
  1922. # vid = video.get('videoId')
  1923. # if vid not in now_video_ids:
  1924. # recall_result.append(video)
  1925. # now_video_ids.append(vid)
  1926. # if len(recall_result) >= size:
  1927. # break
  1928. # else:
  1929. # continue
  1930. # log_.info({
  1931. # 'logTimestamp': int(time.time() * 1000),
  1932. # 'request_id': self.request_id,
  1933. # 'operation': 'rov_pool_recall_with_region',
  1934. # 'executeTime': (time.time() - start_time) * 1000
  1935. # })
  1936. #print("recall_num:", recall_num)
  1937. #print("recall_result:", recall_result[:recall_num])
  1938. return recall_result[:recall_num]
  1939. def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
  1940. """召回池召回视频"""
  1941. # 获取召回池中视频
  1942. videos = self.rov_pool_recall_with_region_process(size=size, expire_time=expire_time)
  1943. # 对在流量池中存在的视频添加标记字段
  1944. result = []
  1945. for item in videos:
  1946. video_id = item['videoId']
  1947. t = [
  1948. gevent.spawn(self.get_video_flow_pool, video_id, True),
  1949. gevent.spawn(self.get_video_flow_pool, video_id, False)
  1950. ]
  1951. gevent.joinall(t)
  1952. flow_pool_list = [i.get() for i in t]
  1953. flow_pool_list = [item for item in flow_pool_list if item != '']
  1954. if len(flow_pool_list) > 0:
  1955. flow_pool = flow_pool_list[0]
  1956. item['flowPool'] = flow_pool
  1957. item['isInFlowPool'] = 1
  1958. result.append(item)
  1959. return result
  1960. def get_video_flow_pool(self, video_id, quick_flow_pool=False):
  1961. """
  1962. 获取videoId对应的任意一个flowPool
  1963. :param video_id: videoId
  1964. :param quick_flow_pool: 是否为快速曝光流量池标识,默认:否 False
  1965. :return: flow_pool
  1966. """
  1967. if quick_flow_pool is True:
  1968. isin_flow_pool_key = \
  1969. f"{config_.QUICK_FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
  1970. flow_pool_key = \
  1971. f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{self.app_type}:" \
  1972. f"{config_.QUICK_FLOW_POOL_ID}:{video_id}"
  1973. else:
  1974. isin_flow_pool_key = \
  1975. f"{config_.FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX}{self.app_type}"
  1976. flow_pool_key = \
  1977. f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{self.app_type}:{video_id}"
  1978. # 判断是否在流量池中
  1979. isin_flow_pool = self.redis_helper.data_exists_with_set(key_name=isin_flow_pool_key, value=video_id)
  1980. flow_pool = ''
  1981. if isin_flow_pool:
  1982. # 随机获取一个flowPool标记
  1983. flow_pool_list = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=1)
  1984. if len(flow_pool_list) > 0:
  1985. flow_pool = flow_pool_list[0]
  1986. return flow_pool
  1987. def get_flow_pool_videos(self):
  1988. """获取当前可分发的流量池视频,以及对应的标记列表"""
  1989. video_id_list = []
  1990. videos_flow_pool = {}
  1991. # 快速曝光流量池
  1992. key_name_quick = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
  1993. # 其他流量池
  1994. key_name_other = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
  1995. for key_name in [key_name_quick, key_name_other]:
  1996. data = self.redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=False)
  1997. if data is None or len(data) == 0:
  1998. continue
  1999. for item in data:
  2000. video_id, flow_pool = item.split('-')
  2001. video_id = int(video_id)
  2002. # ### 对该视频做分发数检查
  2003. cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
  2004. # 无记录
  2005. if cur_count is None:
  2006. continue
  2007. # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
  2008. if cur_count <= 0:
  2009. remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
  2010. self.redis_helper.del_keys(remain_count_key)
  2011. for app_name in config_.APP_TYPE:
  2012. flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(app_name)}"
  2013. quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}" \
  2014. f"{config_.APP_TYPE.get(app_name)}:{config_.QUICK_FLOW_POOL_ID}"
  2015. self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=item)
  2016. self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=item)
  2017. continue
  2018. # 本地分发数 cur_count > 0
  2019. if video_id in video_id_list:
  2020. videos_flow_pool[video_id].append(flow_pool)
  2021. else:
  2022. videos_flow_pool[video_id] = [flow_pool]
  2023. video_id_list.append(video_id)
  2024. return {'video_id_list': video_id_list, 'videos_flow_pool': videos_flow_pool}
  2025. def rov_pool_recall_with_region_by_h(self, province_code, size=4, key_flag=''):
  2026. """
  2027. 地域分组小时级视频召回
  2028. :param size: 视频数
  2029. :param province_code: 省份code
  2030. :param key_flag:
  2031. :return:
  2032. """
  2033. if key_flag == 'region_24h':
  2034. push_from = config_.PUSH_FROM['rov_recall_region_24h']
  2035. elif key_flag == 'day_24h':
  2036. push_from = config_.PUSH_FROM['rov_recall_24h']
  2037. else:
  2038. push_from = config_.PUSH_FROM['rov_recall_region_h']
  2039. # 获取mid对应的小时级列表redis-key
  2040. h_recall_mid_key = self.get_mid_h_key(province_code=province_code, key_flag=key_flag)
  2041. if not self.redis_helper.key_exists(h_recall_mid_key):
  2042. recall_result = []
  2043. else:
  2044. # 过滤的视频
  2045. fil_video_ids = []
  2046. recall_result = []
  2047. # 每次获取的视频数
  2048. get_size = size * 5
  2049. # 记录获取频次
  2050. freq = 0
  2051. while len(recall_result) < size:
  2052. freq += 1
  2053. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  2054. break
  2055. # 获取数据
  2056. data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
  2057. start=(freq - 1) * get_size, end=freq * get_size - 1,
  2058. with_scores=True)
  2059. if not data:
  2060. # log_.info('地域分组小时级更新视频已取完')
  2061. break
  2062. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  2063. video_ids = []
  2064. video_score = {}
  2065. for value in data:
  2066. video_id = int(value[0])
  2067. video_ids.append(video_id)
  2068. video_score[video_id] = value[1]
  2069. # 过滤
  2070. filter_ = FilterVideos(request_id=self.request_id,
  2071. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2072. ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.data_key,
  2073. self.ab_code, province_code, key_flag)
  2074. ge.join()
  2075. filtered_result = ge.get()
  2076. if filtered_result:
  2077. # 添加视频源参数 pushFrom, abCode
  2078. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  2079. 'pushFrom': push_from, 'abCode': self.ab_code}
  2080. for item in filtered_result if video_score.get(int(item)) is not None]
  2081. recall_result.extend(temp_result)
  2082. fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
  2083. else:
  2084. fil_video_ids.extend(video_ids)
  2085. # 将被过滤的视频进行移除
  2086. for value in fil_video_ids:
  2087. self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
  2088. return recall_result[:size]
  2089. def region_dup_recall(self, province_code, region_dup, size=4, expire_time=23*3600):
  2090. """
  2091. region dup 更新列表视频召回
  2092. :param province_code:
  2093. :param region_dup:
  2094. :param size:
  2095. :param expire_time:
  2096. :return:
  2097. """
  2098. if region_dup == 1:
  2099. push_from = config_.PUSH_FROM['rov_recall_region_day']
  2100. elif region_dup == 2:
  2101. push_from = config_.PUSH_FROM['rov_recall_day']
  2102. # 获取region dup更新列表相关redis key, 用户上一次在列表对应的位置
  2103. key_name, last_region_dup_key, idx = self.get_region_dup_video_last_idx_h(
  2104. province_code=province_code, region_dup=region_dup)
  2105. # 获取天级规则更新列表数据
  2106. if not key_name:
  2107. # log_.info(f'region dup 更新列表中无视频, region_dup = {region_dup}')
  2108. recall_result = []
  2109. else:
  2110. recall_result = []
  2111. # 每次获取的视频数
  2112. get_size = size * 5
  2113. # 记录获取频次
  2114. freq = 0
  2115. while len(recall_result) < size:
  2116. freq += 1
  2117. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  2118. break
  2119. # 获取数据
  2120. data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
  2121. start=idx, end=idx + get_size - 1,
  2122. with_scores=True)
  2123. if not data:
  2124. # log_.info(f'region dup 更新视频已取完, region_dup = {region_dup}')
  2125. break
  2126. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  2127. video_ids = []
  2128. video_score = {}
  2129. for value in data:
  2130. video_id = int(value[0])
  2131. video_ids.append(video_id)
  2132. video_score[video_id] = value[1]
  2133. # 过滤
  2134. filter_ = FilterVideos(request_id=self.request_id,
  2135. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2136. ge = gevent.spawn(filter_.filter_videos)
  2137. ge.join()
  2138. filtered_result = ge.get()
  2139. if filtered_result:
  2140. # 添加视频源参数 pushFrom, abCode
  2141. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  2142. 'pushFrom': push_from, 'abCode': self.ab_code}
  2143. for item in filtered_result if video_score.get(int(item)) is not None]
  2144. recall_result.extend(temp_result)
  2145. else:
  2146. # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  2147. if self.mid:
  2148. # mid为空时,不做记录
  2149. self.redis_helper.set_data_to_redis(key_name=last_region_dup_key, value=data[-1][0],
  2150. expire_time=expire_time)
  2151. idx += get_size
  2152. return recall_result[:size]
  2153. def rule_recall_by_h(self, size=4, expire_time=24*3600):
  2154. """
  2155. 小时级召回视频
  2156. :param size: 获取视频个数
  2157. :param expire_time: 末位视频记录redis过期时间
  2158. :return:
  2159. """
  2160. start_time = time.time()
  2161. t = [gevent.spawn(self.rov_pool_recall_h, size),
  2162. gevent.spawn(self.rov_pool_recall_h, size, '24h')]
  2163. gevent.joinall(t)
  2164. h_recall_result_list = [i.get() for i in t]
  2165. # 将已获取到的视频按顺序去重合并
  2166. now_video_ids = []
  2167. recall_result = []
  2168. for h_result in h_recall_result_list:
  2169. for video in h_result:
  2170. video_id = video.get('videoId')
  2171. if video_id not in now_video_ids:
  2172. recall_result.append(video)
  2173. now_video_ids.append(video_id)
  2174. if len(recall_result) >= size:
  2175. break
  2176. else:
  2177. continue
  2178. # 判断获取到的小时级数据数量
  2179. if len(recall_result) < size:
  2180. # 补充数据
  2181. rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  2182. # 去重合并
  2183. for video in rov_recall_result:
  2184. vid = video.get('videoId')
  2185. if vid not in now_video_ids:
  2186. recall_result.append(video)
  2187. now_video_ids.append(vid)
  2188. if len(recall_result) >= size:
  2189. break
  2190. else:
  2191. continue
  2192. log_.info({
  2193. 'logTimestamp': int(time.time() * 1000),
  2194. 'request_id': self.request_id,
  2195. 'operation': 'rule_recall_by_h',
  2196. 'executeTime': (time.time() - start_time) * 1000
  2197. })
  2198. return recall_result[:size]
  2199. def rov_pool_recall_h(self, size=4, key_flag=''):
  2200. """
  2201. 小时级视频召回
  2202. :param size: 视频数
  2203. :param key_flag:
  2204. :return:
  2205. """
  2206. if key_flag == '24h':
  2207. push_from = config_.PUSH_FROM['rov_recall_24h']
  2208. else:
  2209. push_from = config_.PUSH_FROM['rov_recall_h']
  2210. # 获取mid对应的小时级列表redis-key
  2211. h_recall_mid_key = self.get_mid_h_key(province_code='', key_flag=key_flag)
  2212. if not self.redis_helper.key_exists(h_recall_mid_key):
  2213. recall_result = []
  2214. else:
  2215. # 过滤的视频
  2216. fil_video_ids = []
  2217. recall_result = []
  2218. # 每次获取的视频数
  2219. get_size = size * 5
  2220. # 记录获取频次
  2221. freq = 0
  2222. while len(recall_result) < size:
  2223. freq += 1
  2224. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  2225. break
  2226. # 获取数据
  2227. data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
  2228. start=(freq - 1) * get_size, end=freq * get_size - 1,
  2229. with_scores=True)
  2230. if not data:
  2231. # log_.info('小时级更新视频已取完')
  2232. break
  2233. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  2234. video_ids = []
  2235. video_score = {}
  2236. for value in data:
  2237. video_id = int(value[0])
  2238. video_ids.append(video_id)
  2239. video_score[video_id] = value[1]
  2240. # 过滤
  2241. filter_ = FilterVideos(request_id=self.request_id,
  2242. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2243. ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, '', key_flag)
  2244. ge.join()
  2245. filtered_result = ge.get()
  2246. if filtered_result:
  2247. # 添加视频源参数 pushFrom, abCode
  2248. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  2249. 'pushFrom': push_from, 'abCode': self.ab_code}
  2250. for item in filtered_result if video_score.get(int(item)) is not None]
  2251. recall_result.extend(temp_result)
  2252. fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
  2253. else:
  2254. fil_video_ids.extend(video_ids)
  2255. # 将被过滤的视频进行移除
  2256. for value in fil_video_ids:
  2257. self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
  2258. return recall_result[:size]
  2259. def get_relevant_videos_19(self, video_id, size=4):
  2260. """
  2261. 获取最惊奇相关推荐视频
  2262. :param video_id: 头部视频id
  2263. :return: relevant_result
  2264. """
  2265. push_from = config_.PUSH_FROM['top_video_relevant_appType_19']
  2266. relevant_result = []
  2267. relevant_videos_key_name = f"{config_.MOVIE_RELEVANT_LIST_KEY_NAME_PREFIX}{video_id}"
  2268. # redis_helper = RedisHelper()
  2269. if not self.redis_helper.key_exists(key_name=relevant_videos_key_name):
  2270. return relevant_result
  2271. # 获取数据
  2272. data = self.redis_helper.get_data_zset_with_index(key_name=relevant_videos_key_name, start=0, end=-1,
  2273. with_scores=True)
  2274. if not data:
  2275. return relevant_result
  2276. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  2277. video_ids = []
  2278. video_score = {}
  2279. for value in data:
  2280. video_id = int(value[0])
  2281. video_ids.append(video_id)
  2282. video_score[video_id] = value[1]
  2283. # 过滤
  2284. filter_ = FilterVideos(request_id=self.request_id, app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2285. ge = gevent.spawn(filter_.filter_videos)
  2286. ge.join()
  2287. filtered_result = ge.get()
  2288. if filtered_result:
  2289. # 添加视频源参数 pushFrom, abCode
  2290. relevant_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  2291. 'pushFrom': push_from, 'abCode': self.ab_code}
  2292. for item in filtered_result if video_score.get(int(item)) is not None]
  2293. return relevant_result[:size]
  2294. def relevant_recall_19(self, video_id, size=4, expire_time=24*3600):
  2295. """
  2296. 最惊奇相关推荐视频召回
  2297. :param video_id:
  2298. :param size:
  2299. :param expire_time:
  2300. :return:
  2301. """
  2302. t = [gevent.spawn(self.get_relevant_videos_19, video_id, size),
  2303. gevent.spawn(self.rov_pool_recall, size, expire_time)]
  2304. gevent.joinall(t)
  2305. relevant_recall_result_list = [i.get() for i in t]
  2306. # 将已获取到的视频按顺序去重合并
  2307. now_video_ids = []
  2308. recall_result = []
  2309. for relevant_result in relevant_recall_result_list:
  2310. for video in relevant_result:
  2311. video_id = video.get('videoId')
  2312. if video_id not in now_video_ids:
  2313. recall_result.append(video)
  2314. now_video_ids.append(video_id)
  2315. return recall_result[:size]
  2316. def rov_pool_recall_19(self, size=4, expire_time=24*3600):
  2317. """
  2318. 最惊奇视频召回
  2319. :param size: 获取视频个数
  2320. :param expire_time: 末位视频记录redis过期时间
  2321. :return:
  2322. """
  2323. t = [gevent.spawn(self.rov_pool_recall, size, expire_time=3600, video_type='whole_movies', push_from=config_.PUSH_FROM['whole_movies']),
  2324. gevent.spawn(self.flow_pool_recall_18_19, size, push_from=config_.PUSH_FROM['talk_videos'])]
  2325. gevent.joinall(t)
  2326. recall_result_list = [i.get() for i in t]
  2327. # 将已获取到的视频按顺序去重合并
  2328. now_video_ids = []
  2329. recall_result = []
  2330. for item in recall_result_list:
  2331. for video in item:
  2332. video_id = video.get('videoId')
  2333. if video_id not in now_video_ids:
  2334. recall_result.append(video)
  2335. now_video_ids.append(video_id)
  2336. if len(recall_result) >= size:
  2337. break
  2338. else:
  2339. continue
  2340. # 判断获取到的小时级数据数量
  2341. if len(recall_result) < size:
  2342. # 补充数据
  2343. rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
  2344. # 去重合并
  2345. for video in rov_recall_result:
  2346. vid = video.get('videoId')
  2347. if vid not in now_video_ids:
  2348. recall_result.append(video)
  2349. now_video_ids.append(vid)
  2350. if len(recall_result) >= size:
  2351. break
  2352. else:
  2353. continue
  2354. return recall_result[:size]
  2355. def update_last_video_record(self, record_key, pool_key_prefix, province_code):
  2356. # 判断当前小时的小时级列表是否更新
  2357. now_date = datetime.today()
  2358. h = datetime.now().hour
  2359. now_dt = datetime.strftime(now_date, '%Y%m%d')
  2360. now_pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{now_dt}:{h}"
  2361. if self.redis_helper.key_exists(key_name=now_pool_recall_key):
  2362. value = {'date': now_dt, 'h': h}
  2363. self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
  2364. else:
  2365. if h == 0:
  2366. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  2367. redis_h = 23
  2368. else:
  2369. redis_dt = now_dt
  2370. redis_h = h - 1
  2371. now_pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{redis_dt}:{redis_h}"
  2372. value = {'date': redis_dt, 'h': redis_h}
  2373. self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
  2374. return now_pool_recall_key
  2375. def get_video_idx(self, pool_recall_key, last_video_key):
  2376. """
  2377. 获取上次视频所在位置
  2378. :param pool_recall_key: 视频所在列表 key
  2379. :param last_video_key: 上一次记录的视频
  2380. :return: idx
  2381. """
  2382. value = self.redis_helper.get_data_from_redis(last_video_key)
  2383. if value:
  2384. idx = self.redis_helper.get_index_with_data(key_name=pool_recall_key, value=value)
  2385. if not idx:
  2386. idx = 0
  2387. else:
  2388. idx += 1
  2389. else:
  2390. idx = 0
  2391. return idx
  2392. def get_last_recommend_video_idx(self, province_code, record_key_prefix, pool_key_prefix, last_video_key_prefix):
  2393. # 判断mid对应上一次视频位置 时间记录
  2394. record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
  2395. last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
  2396. if not self.redis_helper.key_exists(key_name=record_key):
  2397. # ###### 记录key不存在
  2398. self.redis_helper.del_keys(key_name=last_video_key)
  2399. idx = 0
  2400. pool_recall_key = self.update_last_video_record(record_key=record_key, pool_key_prefix=pool_key_prefix,
  2401. province_code=province_code)
  2402. else:
  2403. # ###### 记录key存在,判断date, h
  2404. now_date = datetime.today()
  2405. h = datetime.now().hour
  2406. # 获取记录的date, h
  2407. record = self.redis_helper.get_data_from_redis(key_name=record_key)
  2408. record_dt = eval(record).get('date')
  2409. record_h = eval(record).get('h')
  2410. now_dt = datetime.strftime(now_date, '%Y%m%d')
  2411. if record_dt == now_dt and int(record_h) == h:
  2412. # 已获取当前小时数据
  2413. pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{now_dt}:{h}"
  2414. idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
  2415. elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
  2416. # 记录的h - 当前h = 1,判断当前h数据是否已更新
  2417. now_pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{now_dt}:{h}"
  2418. if self.redis_helper.key_exists(key_name=now_pool_recall_key):
  2419. new_record = {'date': now_dt, 'h': h}
  2420. self.redis_helper.set_data_to_redis(key_name=record_key, value=str(new_record), expire_time=2*3600)
  2421. idx = 0
  2422. self.redis_helper.del_keys(key_name=last_video_key)
  2423. pool_recall_key = now_pool_recall_key
  2424. else:
  2425. pool_recall_key = f"{pool_key_prefix}{province_code}:{self.data_key}:{self.rule_key}:{record_dt}:{record_h}"
  2426. idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
  2427. else:
  2428. idx = 0
  2429. self.redis_helper.del_keys(key_name=last_video_key)
  2430. pool_recall_key = self.update_last_video_record(record_key=record_key, pool_key_prefix=pool_key_prefix,
  2431. province_code=province_code)
  2432. return pool_recall_key, idx
  2433. def recall_region_dup_24h(self, province_code, size=4, key_flag='', expire_time=2*3600):
  2434. """
  2435. 从小程序小时级24h数据 筛选后的剩余数据 更新结果中获取视频
  2436. :param size: 获取视频个数
  2437. :param key_flag: 视频表标记
  2438. :param expire_time: 末位视频记录redis过期时间
  2439. :return:
  2440. """
  2441. start_time = time.time()
  2442. if key_flag == 'region_h':
  2443. # 分地域相对24h的筛选结果
  2444. # mid对应上一次视频位置 时间记录
  2445. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_H
  2446. # 视频列表
  2447. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
  2448. # mid对应上一次视频记录
  2449. last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_H_PREFIX
  2450. push_from = config_.PUSH_FROM['rov_recall_region_h']
  2451. elif key_flag == 'region_24h':
  2452. # 分地域相对24h的筛选结果
  2453. # mid对应上一次视频位置 时间记录
  2454. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP1_24H
  2455. # 视频列表
  2456. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H
  2457. # mid对应上一次视频记录
  2458. last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP1_24H_PREFIX
  2459. push_from = config_.PUSH_FROM['rov_recall_region_24h']
  2460. elif key_flag == '24h_dup2':
  2461. # 不分地域相对24h的筛选结果
  2462. # mid对应上一次视频位置 时间记录
  2463. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP2_24H
  2464. # 视频列表
  2465. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H
  2466. # mid对应上一次视频记录
  2467. last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP2_24H_PREFIX
  2468. push_from = config_.PUSH_FROM['rov_recall_24h']
  2469. elif key_flag == '24h_dup3':
  2470. # 不分地域相对24h的筛选后剩余数据
  2471. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_24H
  2472. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
  2473. last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX
  2474. push_from = config_.PUSH_FROM['rov_recall_24h_dup']
  2475. elif key_flag == '48h_dup2':
  2476. # 不分地域相对48h的筛选结果
  2477. # mid对应上一次视频位置 时间记录
  2478. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP2_48H
  2479. # 视频列表
  2480. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H
  2481. # mid对应上一次视频记录
  2482. last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP2_48H_PREFIX
  2483. push_from = config_.PUSH_FROM['rov_recall_48h']
  2484. elif key_flag == '48h_dup3':
  2485. # 不分地域相对48h的筛选后剩余数据
  2486. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_48H
  2487. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H
  2488. last_video_key_prefix = config_.LAST_VIDEO_FROM_REGION_DUP3_48H_PREFIX
  2489. push_from = config_.PUSH_FROM['rov_recall_48h_dup']
  2490. else:
  2491. return []
  2492. # 获取相关redis key, 用户上一次在rov召回池对应的位置
  2493. pool_key, idx = self.get_last_recommend_video_idx(province_code=province_code,
  2494. record_key_prefix=record_key_prefix,
  2495. pool_key_prefix=pool_key_prefix,
  2496. last_video_key_prefix=last_video_key_prefix)
  2497. if not pool_key:
  2498. return []
  2499. recall_data = []
  2500. pool_recall_result = []
  2501. # 每次获取的视频数
  2502. get_size = size * 5
  2503. # 记录获取频次
  2504. freq = 0
  2505. while len(pool_recall_result) < size:
  2506. freq += 1
  2507. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  2508. break
  2509. # 获取数据
  2510. data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
  2511. start=idx, end=idx + get_size - 1,
  2512. with_scores=True)
  2513. if not data:
  2514. break
  2515. recall_data.extend(data)
  2516. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  2517. video_ids = []
  2518. video_score = {}
  2519. for value in data:
  2520. video_id = int(value[0])
  2521. video_ids.append(video_id)
  2522. video_score[video_id] = value[1]
  2523. # 过滤
  2524. filter_ = FilterVideos(request_id=self.request_id,
  2525. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2526. ge = gevent.spawn(filter_.filter_videos)
  2527. ge.join()
  2528. filtered_result = ge.get()
  2529. if filtered_result:
  2530. # 添加视频源参数 pushFrom, abCode
  2531. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  2532. 'pushFrom': push_from, 'abCode': self.ab_code}
  2533. for item in filtered_result if video_score.get(int(item)) is not None]
  2534. pool_recall_result.extend(temp_result)
  2535. # else:
  2536. # # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  2537. # if self.mid and self.ab_code != config_.AB_CODE['region_rank_by_h'].get('abtest_112'):
  2538. # # mid为空时,不做记录
  2539. # last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
  2540. # self.redis_helper.set_data_to_redis(key_name=last_video_key, value=data[-1][0],
  2541. # expire_time=expire_time)
  2542. idx += get_size
  2543. pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
  2544. # if len(recall_data) > 0 and len(pool_recall_result) == 0 \
  2545. # and self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_112') and self.mid:
  2546. if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
  2547. # 召回数据不为空 & 过滤后结果为空 & 位于实验组 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
  2548. last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
  2549. self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
  2550. expire_time=expire_time)
  2551. # log_.info({
  2552. # 'logTimestamp': int(time.time() * 1000),
  2553. # 'request_id': self.request_id,
  2554. # 'operation': push_from,
  2555. # 'pool_recall_result': pool_recall_result,
  2556. # 'executeTime': (time.time() - start_time) * 1000
  2557. # })
  2558. return pool_recall_result[:size]
  2559. def update_last_video_record_by_day(self, record_key, pool_key_prefix, expire_time):
  2560. # 判断当前日期的小时级列表是否更新
  2561. now_date = datetime.today()
  2562. now_dt = datetime.strftime(now_date, '%Y%m%d')
  2563. now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
  2564. if self.redis_helper.key_exists(key_name=now_pool_recall_key):
  2565. value = {'date': now_dt}
  2566. self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
  2567. else:
  2568. redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
  2569. now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{redis_dt}"
  2570. value = {'date': redis_dt}
  2571. self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
  2572. return now_pool_recall_key
  2573. def get_last_recommend_video_idx_by_day(self, record_key_prefix, pool_key_prefix, last_video_key_prefix, expire_time):
  2574. # 判断mid对应上一次视频位置 时间记录
  2575. record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
  2576. last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
  2577. if not self.redis_helper.key_exists(key_name=record_key):
  2578. # ###### 记录key不存在
  2579. self.redis_helper.del_keys(key_name=last_video_key)
  2580. idx = 0
  2581. pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
  2582. pool_key_prefix=pool_key_prefix,
  2583. expire_time=expire_time)
  2584. else:
  2585. # ###### 记录key存在,判断date
  2586. now_date = datetime.today()
  2587. # 获取记录的date
  2588. record = self.redis_helper.get_data_from_redis(key_name=record_key)
  2589. record_dt = eval(record).get('date')
  2590. now_dt = datetime.strftime(now_date, '%Y%m%d')
  2591. if record_dt == now_dt:
  2592. # 已获取当前日期数据
  2593. pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
  2594. idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
  2595. elif record_dt == datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d'):
  2596. # 记录的dt - 当前dt = 1,判断当前h数据是否已更新
  2597. now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
  2598. if self.redis_helper.key_exists(key_name=now_pool_recall_key):
  2599. new_record = {'date': now_dt}
  2600. self.redis_helper.set_data_to_redis(key_name=record_key,
  2601. value=str(new_record),
  2602. expire_time=expire_time)
  2603. idx = 0
  2604. self.redis_helper.del_keys(key_name=last_video_key)
  2605. pool_recall_key = now_pool_recall_key
  2606. else:
  2607. pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{record_dt}"
  2608. idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
  2609. else:
  2610. idx = 0
  2611. self.redis_helper.del_keys(key_name=last_video_key)
  2612. pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
  2613. pool_key_prefix=pool_key_prefix,
  2614. expire_time=expire_time)
  2615. return pool_recall_key, idx
  2616. def recall_update_by_day(self, size=4, key_flag='', expire_time=24*3600):
  2617. """
  2618. 从天级更新列表中获取视频
  2619. :param size: 获取视频个数
  2620. :param key_flag: 视频表标记
  2621. :param expire_time: 末位视频记录redis过期时间
  2622. :return:
  2623. """
  2624. if key_flag == '30day':
  2625. # 相对30天计算列表的筛选结果
  2626. # 视频列表
  2627. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
  2628. # mid对应上一次视频位置 时间记录
  2629. record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_30DAY
  2630. # mid对应上一次视频记录
  2631. last_video_key_prefix = config_.LAST_VIDEO_FROM_30DAY_PREFIX
  2632. push_from = config_.PUSH_FROM['rov_recall_30day']
  2633. else:
  2634. return []
  2635. # 获取相关redis key, 用户上一次在rov召回池对应的位置
  2636. pool_key, idx = self.get_last_recommend_video_idx_by_day(record_key_prefix=record_key_prefix,
  2637. pool_key_prefix=pool_key_prefix,
  2638. last_video_key_prefix=last_video_key_prefix,
  2639. expire_time=expire_time)
  2640. if not pool_key:
  2641. return []
  2642. recall_data = []
  2643. pool_recall_result = []
  2644. # 每次获取的视频数
  2645. get_size = size * 5
  2646. # 记录获取频次
  2647. freq = 0
  2648. while len(pool_recall_result) < size:
  2649. freq += 1
  2650. if freq > config_.MAX_FREQ_FROM_ROV_POOL:
  2651. break
  2652. # 获取数据
  2653. data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
  2654. start=idx, end=idx + get_size - 1,
  2655. with_scores=True)
  2656. if not data:
  2657. break
  2658. recall_data.extend(data)
  2659. # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
  2660. video_ids = []
  2661. video_score = {}
  2662. for value in data:
  2663. video_id = int(value[0])
  2664. video_ids.append(video_id)
  2665. video_score[video_id] = value[1]
  2666. # 过滤
  2667. filter_ = FilterVideos(request_id=self.request_id,
  2668. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2669. ge = gevent.spawn(filter_.filter_videos)
  2670. ge.join()
  2671. filtered_result = ge.get()
  2672. if filtered_result:
  2673. # 添加视频源参数 pushFrom, abCode
  2674. temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
  2675. 'pushFrom': push_from, 'abCode': self.ab_code}
  2676. for item in filtered_result if video_score.get(int(item)) is not None]
  2677. pool_recall_result.extend(temp_result)
  2678. idx += get_size
  2679. pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
  2680. if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
  2681. # 召回数据不为空 & 过滤后结果为空 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
  2682. last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
  2683. self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
  2684. expire_time=expire_time)
  2685. # log_.info({
  2686. # 'logTimestamp': int(time.time() * 1000),
  2687. # 'request_id': self.request_id,
  2688. # 'operation': push_from,
  2689. # 'pool_recall_result': pool_recall_result,
  2690. # 'executeTime': (time.time() - start_time) * 1000
  2691. # })
  2692. return pool_recall_result[:size]
  2693. #linfan
  2694. def get_sim_hot_item_reall(self):
  2695. recall_key = "sim_hot_"+str(self.video_id)
  2696. #print("recall_key:", recall_key)
  2697. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2698. #print(data)
  2699. recall_result = []
  2700. if data is not None:
  2701. json_result =json.loads(data)
  2702. #print("json_result:", json_result)
  2703. for per_item in json_result:
  2704. recall_result.append(
  2705. {'videoId': per_item[0], 'flowPool': '',
  2706. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
  2707. 'abCode': self.ab_code}
  2708. )
  2709. return recall_result[:200]
  2710. def get_sim_hot_item_reall_filter(self):
  2711. if self.video_id is None:
  2712. return []
  2713. recall_key = "sim_hot_" + str(self.video_id)
  2714. #print("recall_key:", recall_key)
  2715. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2716. #print(data)
  2717. recall_result = []
  2718. recall_dict = {}
  2719. video_ids = []
  2720. if data is not None:
  2721. json_result = json.loads(data)
  2722. # print("json_result:", json_result)
  2723. for per_item in json_result:
  2724. try:
  2725. vid = int(per_item[0])
  2726. video_ids.append(vid)
  2727. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  2728. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
  2729. 'abCode': self.ab_code}
  2730. except Exception as e:
  2731. continue
  2732. if len(video_ids)<=0:
  2733. return recall_result
  2734. recall_num = 20
  2735. #print("recall_num:", recall_num)
  2736. video_ids = video_ids[:recall_num]
  2737. #print(video_ids)
  2738. filter_ = FilterVideos(request_id=self.request_id,
  2739. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2740. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  2741. if filtered_viewed_videos is None:
  2742. return recall_result
  2743. #print("filtered_viewed_videos:", filtered_viewed_videos)
  2744. for vid in filtered_viewed_videos:
  2745. if vid in recall_dict:
  2746. recall_result.append(recall_dict[vid])
  2747. return recall_result
  2748. # get region_hour_recall
  2749. def get_region_hour_recall(self, size=4, region_code='-1'):
  2750. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
  2751. recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
  2752. #print("recall_key:", recall_key)
  2753. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2754. #print(data)
  2755. recall_result = []
  2756. if data is not None:
  2757. json_result = json.loads(data)
  2758. #print("json_result:", json_result)
  2759. for per_item in json_result:
  2760. recall_result.append(
  2761. {'videoId': per_item[0], 'flowPool': '',
  2762. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_h'],
  2763. 'abCode': self.ab_code}
  2764. )
  2765. return recall_result[:30]
  2766. # get region_day_recall
  2767. def get_region_day_recall(self, size=4,region_code='-1'):
  2768. """召回池召回视频"""
  2769. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H
  2770. recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
  2771. #print("recall_key:", recall_key)
  2772. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2773. #print(data)
  2774. recall_result = []
  2775. if data is not None:
  2776. json_result = json.loads(data)
  2777. #print("json_result:", json_result)
  2778. for per_item in json_result:
  2779. recall_result.append(
  2780. {'videoId': per_item[0], 'flowPool': '',
  2781. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_24h'],
  2782. 'abCode': self.ab_code}
  2783. )
  2784. return recall_result[:200]
  2785. def get_selected_recall(self, size=4, region_code='-1'):
  2786. """召回池召回视频"""
  2787. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H
  2788. recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
  2789. #print("recall_key:", recall_key)
  2790. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2791. #print(data)
  2792. recall_result = []
  2793. if data is not None:
  2794. json_result = json.loads(data)
  2795. #print("json_result:", json_result)
  2796. for per_item in json_result:
  2797. recall_result.append(
  2798. {'videoId': per_item[0], 'flowPool': '',
  2799. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h'],
  2800. 'abCode': self.ab_code}
  2801. )
  2802. #print("recall_result:", recall_result)
  2803. return recall_result[:200]
  2804. def get_no_selected_recall(self, size=4, region_code='-1'):
  2805. """未选择召回池召回视频"""
  2806. pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
  2807. recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
  2808. #print("recall_key:", recall_key)
  2809. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2810. #print(data)
  2811. recall_result = []
  2812. if data is not None:
  2813. json_result = json.loads(data)
  2814. #print("json_result:", json_result)
  2815. for per_item in json_result:
  2816. recall_result.append(
  2817. {'videoId': per_item[0], 'flowPool': '',
  2818. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h_dup'],
  2819. 'abCode': self.ab_code}
  2820. )
  2821. return recall_result[:200]
  2822. def get_fast_flow_pool_recall(self, size=4):
  2823. """快速流量池召回视频"""
  2824. recall_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
  2825. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2826. recall_result = []
  2827. if data is not None:
  2828. json_result = json.loads(data)
  2829. #print("json_result:", json_result)
  2830. for per_item in json_result:
  2831. recall_result.append(
  2832. {'videoId': per_item[0], 'flowPool': '',
  2833. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['fast_flow_recall'],
  2834. 'abCode': self.ab_code}
  2835. )
  2836. return recall_result
  2837. def get_flow_pool_recall(self, size=4):
  2838. """流量池召回视频"""
  2839. recall_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
  2840. #print("recall_key:", recall_key)
  2841. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2842. #print(data)
  2843. recall_result = []
  2844. if data is not None:
  2845. json_result = json.loads(data)
  2846. #print("json_result:", json_result)
  2847. for per_item in json_result:
  2848. recall_result.append(
  2849. {'videoId': per_item[0], 'flowPool': '',
  2850. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['normal_flow_recall'],
  2851. 'abCode': self.ab_code}
  2852. )
  2853. return recall_result
  2854. def new_flow_pool_recall(self, size=10, flow_pool_id=None):
  2855. """从流量池中获取视频"""
  2856. start_time = time.time()
  2857. flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
  2858. flow_pool_recall_result = []
  2859. flow_pool_recall_videos = []
  2860. # 每次获取的视频数
  2861. get_size = size * 5
  2862. # 记录获取频次
  2863. freq = 0
  2864. idx = 0
  2865. while len(flow_pool_recall_result) < size:
  2866. freq += 1
  2867. if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
  2868. break
  2869. # 获取数据
  2870. # st_get = time.time()
  2871. data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
  2872. start=idx, end=idx + get_size - 1,
  2873. with_scores=True)
  2874. et_get = time.time()
  2875. # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
  2876. # freq, data, (et_get - st_get) * 1000))
  2877. if not data:
  2878. # log_.info('流量池中的视频已取完')
  2879. break
  2880. # 将video_id 与 flow_pool, score做mapping整理
  2881. video_ids = []
  2882. video_mapping = {}
  2883. video_score = {}
  2884. for value in data:
  2885. try:
  2886. video_id, flow_pool = value[0].split('-')
  2887. except Exception as e:
  2888. log_.error({
  2889. 'request_id': self.request_id,
  2890. 'app_type': self.app_type,
  2891. 'flow_pool_value': value
  2892. })
  2893. continue
  2894. video_id = int(video_id)
  2895. if video_id not in video_ids:
  2896. video_ids.append(video_id)
  2897. video_score[video_id] = value[1]
  2898. if video_id not in video_mapping:
  2899. video_mapping[video_id] = [flow_pool]
  2900. else:
  2901. video_mapping[video_id].append(flow_pool)
  2902. # 过滤
  2903. # filter_ = FilterVideos(request_id=self.request_id,
  2904. # app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2905. # filtered_result = filter_.filter_videos(pool_type='flow', region_code=region_code, shield_config=self.shield_config)
  2906. # print("flow filter time:", (time.time()-et_get)*1000)
  2907. # 检查可分发数
  2908. if video_ids and len(video_ids)>0:
  2909. check_result = self.check_video_counts(video_ids=video_ids, flow_pool_mapping=video_mapping)
  2910. for item in check_result:
  2911. video_id = int(item[0])
  2912. flow_pool = item[1]
  2913. if video_id not in flow_pool_recall_videos:
  2914. # 取其中一个 flow_pool 作为召回结果
  2915. # 添加视频源参数 pushFrom, abCode
  2916. flow_pool_recall_result.append(
  2917. {'videoId': video_id, 'flowPool': flow_pool,
  2918. 'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
  2919. 'abCode': self.ab_code}
  2920. )
  2921. flow_pool_recall_videos.append(video_id)
  2922. # et_check = time.time()
  2923. # log_.info('check result: result = {}, execute time = {}ms'.format(
  2924. # check_result, (et_check - st_check) * 1000))
  2925. # # 判断错误标记, True为错误
  2926. # if error_flag:
  2927. # # 结束流量池召回
  2928. # break
  2929. idx += get_size
  2930. # log_.info({
  2931. # 'logTimestamp': int(time.time() * 1000),
  2932. # 'request_id': self.request_id,
  2933. # 'operation': 'flow_pool_recall',
  2934. # 'executeTime': (time.time() - start_time) * 1000
  2935. # })
  2936. return flow_pool_recall_result[:size]
  2937. def get_3days_hot_item_reall(self, exp_config=None):
  2938. recall_key = "hot_3day:"
  2939. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2940. recall_result = []
  2941. recall_dict = {}
  2942. video_ids = []
  2943. if data is not None and data!="":
  2944. try:
  2945. json_result = json.loads(data)
  2946. for per_item in json_result:
  2947. vid = int(per_item[0])
  2948. video_ids.append(vid)
  2949. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  2950. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['hot_3_day_recall'],
  2951. 'abCode': self.ab_code}
  2952. except Exception as e:
  2953. return recall_result
  2954. #print("vid len:", len(video_ids))
  2955. if len(video_ids)<=0:
  2956. return recall_result
  2957. recall_num = 20
  2958. try:
  2959. if exp_config and exp_config['recall_get_num']:
  2960. recall_num = int(exp_config['recall_get_num'])
  2961. except:
  2962. recall_num = 20
  2963. video_ids = video_ids[:recall_num]
  2964. #print(video_ids)
  2965. filter_ = FilterVideos(request_id=self.request_id,
  2966. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  2967. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  2968. if filtered_viewed_videos is None:
  2969. return recall_result
  2970. #print("filtered_viewed_videos:", filtered_viewed_videos)
  2971. for vid in filtered_viewed_videos:
  2972. if vid in recall_dict:
  2973. recall_result.append(recall_dict[vid])
  2974. #print("hot recall_result:", recall_result)
  2975. #print("recall_dict:", recall_dict)
  2976. return recall_result
  2977. def get_hot_item_reall(self,exp_config=None):
  2978. #recall_key = "hot_video:"
  2979. recall_key = "hot_video:"
  2980. #print("recall_key:", recall_key)
  2981. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  2982. recall_result = []
  2983. recall_dict = {}
  2984. video_ids = []
  2985. if data is not None and data!="":
  2986. try:
  2987. json_result = json.loads(data)
  2988. for per_item in json_result:
  2989. vid = int(per_item[0])
  2990. video_ids.append(vid)
  2991. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  2992. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['hot_recall'],
  2993. 'abCode': self.ab_code}
  2994. except Exception as e:
  2995. recall_result
  2996. if len(video_ids)<=0:
  2997. return recall_result
  2998. recall_num = 20
  2999. try:
  3000. if exp_config and exp_config['recall_get_num']:
  3001. recall_num = int(exp_config['recall_get_num'])
  3002. except:
  3003. recall_num = 20
  3004. #print("recall_num:", recall_num)
  3005. video_ids = video_ids[:recall_num]
  3006. #print(video_ids)
  3007. filter_ = FilterVideos(request_id=self.request_id,
  3008. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3009. filtered_viewed_videos = filter_.filter_videos(pool_type='rov')
  3010. if filtered_viewed_videos is None:
  3011. return recall_result
  3012. #print("filtered_viewed_videos:", filtered_viewed_videos)
  3013. for vid in filtered_viewed_videos:
  3014. if vid in recall_dict:
  3015. recall_result.append(recall_dict[vid])
  3016. #print("hot recall_result:", recall_result)
  3017. return recall_result
  3018. def get_title_recall(self):
  3019. if self.video_id is None:
  3020. return []
  3021. recall_key = "title_I2I:" + str(self.video_id)
  3022. # print("recall_key:", recall_key)
  3023. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3024. # print(data)
  3025. recall_result = []
  3026. recall_dict = {}
  3027. video_ids = []
  3028. if data is not None and data!="":
  3029. try:
  3030. json_result = json.loads(data)
  3031. for per_item in json_result:
  3032. vid = int(per_item[0])
  3033. video_ids.append(vid)
  3034. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  3035. 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['title_i2i_recall'],
  3036. 'abCode': self.ab_code}
  3037. except Exception as e:
  3038. return recall_result
  3039. if len(video_ids) <= 0:
  3040. return recall_result
  3041. video_ids = video_ids[:50]
  3042. # print(video_ids)
  3043. filter_ = FilterVideos(request_id=self.request_id,
  3044. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3045. filtered_viewed_videos = filter_.filter_videos(pool_type='normal')
  3046. if filtered_viewed_videos is None:
  3047. return recall_result
  3048. # print("filtered_viewed_videos:", filtered_viewed_videos)
  3049. for vid in filtered_viewed_videos:
  3050. if vid in recall_dict:
  3051. recall_result.append(recall_dict[vid])
  3052. return recall_result
  3053. def get_word2vec_item_reall(self, exp_config=None):
  3054. if self.video_id is None:
  3055. return []
  3056. recall_key = "w2v:" + str(self.video_id)
  3057. #print("recall_key:", recall_key)
  3058. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3059. #print(data)
  3060. recall_result = []
  3061. recall_dict = {}
  3062. video_ids = []
  3063. if data is not None and data.strip()!="":
  3064. try:
  3065. json_result = data.strip().split(",")
  3066. for per_item in json_result:
  3067. vid = int(per_item)
  3068. video_ids.append(vid)
  3069. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  3070. 'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['w2v_recall'],
  3071. 'abCode': self.ab_code}
  3072. except Exception as e:
  3073. return recall_result
  3074. if len(video_ids)<=0:
  3075. return recall_result
  3076. recall_num = 20
  3077. try:
  3078. if exp_config and exp_config['recall_get_num']:
  3079. recall_num = int(exp_config['recall_get_num'])
  3080. except:
  3081. recall_num = 20
  3082. #print("recall_num:", recall_num)
  3083. video_ids = video_ids[:recall_num]
  3084. #print(video_ids)
  3085. filter_ = FilterVideos(request_id=self.request_id,
  3086. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3087. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  3088. if filtered_viewed_videos is None:
  3089. return recall_result
  3090. #print("filtered_viewed_videos:", filtered_viewed_videos)
  3091. for vid in filtered_viewed_videos:
  3092. if vid in recall_dict:
  3093. recall_result.append(recall_dict[vid])
  3094. return recall_result[:30]
  3095. def get_test_config(self):
  3096. recall_key = "test_exp_config_pos"
  3097. # print("recall_key:", recall_key)
  3098. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3099. if data is not None and data!="":
  3100. try:
  3101. #print(data)
  3102. json_result = json.loads(data)
  3103. #print(json_result)
  3104. return json_result
  3105. except Exception as e:
  3106. return None
  3107. else:
  3108. return None
  3109. def get_w2v_config(self):
  3110. recall_key = "w2v_exp_config_pos_range"
  3111. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3112. if data is not None and data!="":
  3113. try:
  3114. #print(data)
  3115. json_result = json.loads(data)
  3116. #print(json_result)
  3117. return json_result
  3118. except Exception as e:
  3119. return None
  3120. else:
  3121. return None
  3122. def get_simrecall_config(self):
  3123. recall_key = "simrecall_exp_config_pos"
  3124. # print("recall_key:", recall_key)
  3125. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3126. if data is not None:
  3127. try:
  3128. # print(data)
  3129. json_result = json.loads(data)
  3130. # print(json_result)
  3131. return json_result
  3132. except Exception as e:
  3133. return None
  3134. else:
  3135. return None
  3136. def get_simrecall_config_new(self):
  3137. recall_key = "simrecall_exp_config_range"
  3138. #print("recall_key:", recall_key)
  3139. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3140. if data is not None and data!="":
  3141. try:
  3142. # print(data)
  3143. json_result = json.loads(data)
  3144. # print(json_result)
  3145. return json_result
  3146. except Exception as e:
  3147. return None
  3148. else:
  3149. return None
  3150. def get_hotrecall_config(self):
  3151. recall_key = "ht_exp_config"
  3152. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3153. if data is not None:
  3154. try:
  3155. # print(data)
  3156. json_result = json.loads(data)
  3157. # print(json_result)
  3158. return json_result
  3159. except Exception as e:
  3160. return None
  3161. else:
  3162. return None
  3163. def get_U2I_config(self):
  3164. recall_key = "u2i_exp_config_pos_range_new"
  3165. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3166. if data is not None and data!="":
  3167. try:
  3168. # print(data)
  3169. json_result = json.loads(data)
  3170. # print(json_result)
  3171. return json_result
  3172. except Exception as e:
  3173. return None
  3174. else:
  3175. return None
  3176. def get_u2u2i_config(self):
  3177. recall_key = "u2u2i_exp_config_range"
  3178. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3179. if data is not None:
  3180. try:
  3181. # print(data)
  3182. json_result = json.loads(data)
  3183. # print(json_result)
  3184. return json_result
  3185. except Exception as e:
  3186. return None
  3187. else:
  3188. return None
  3189. def get_hotrecall_flow_config(self):
  3190. recall_key = "ht_flow_config:"
  3191. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3192. return data
  3193. def get_w2v_flow_config(self):
  3194. recall_key = "w2v_flow_config:"
  3195. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3196. return data
  3197. def get_flow_config(self):
  3198. recall_key = "flow_config:"
  3199. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3200. return data
  3201. def get_simrecall_flow_config(self):
  3202. recall_key = "simrecall_flow_config:"
  3203. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3204. return data
  3205. def get_flow_exp_7_config(self):
  3206. recall_key = "exp7_exp_config"
  3207. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3208. if data is not None:
  3209. try:
  3210. # print(data)
  3211. json_result = json.loads(data)
  3212. # print(json_result)
  3213. return json_result
  3214. except Exception as e:
  3215. return None
  3216. else:
  3217. return None
  3218. def get_flow_exp_8_config(self):
  3219. recall_key = "exp8_exp_config"
  3220. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3221. if data is not None:
  3222. try:
  3223. # print(data)
  3224. json_result = json.loads(data)
  3225. # print(json_result)
  3226. return json_result
  3227. except Exception as e:
  3228. return None
  3229. else:
  3230. return None
  3231. def get_sort_ab_codel_config(self):
  3232. ab_key = "sort_ab_config2"
  3233. data = self.redis_helper.get_data_from_redis(key_name=ab_key)
  3234. if data is not None:
  3235. try:
  3236. # print(data)
  3237. json_result = json.loads(data)
  3238. # print(json_result)
  3239. return json_result
  3240. except Exception as e:
  3241. return None
  3242. else:
  3243. return None
  3244. def get_U2I_reall(self, mid):
  3245. #recall_key = "hot_video:"
  3246. if not mid:
  3247. return []
  3248. recall_key = "u2i:"+mid
  3249. #print("recall_key:", recall_key)
  3250. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3251. #print(data)
  3252. recall_result = []
  3253. recall_dict = {}
  3254. video_ids = []
  3255. if data is not None and data!="":
  3256. try:
  3257. json_result = json.loads(data)
  3258. for per_item in json_result:
  3259. vid = int(per_item[0])
  3260. video_ids.append(vid)
  3261. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  3262. 'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2i_tag_recall'],
  3263. 'abCode': self.ab_code}
  3264. except Exception as e:
  3265. return recall_result
  3266. if len(video_ids)<=0:
  3267. return recall_result
  3268. recall_num = 20
  3269. #print("recall_num:", recall_num)
  3270. video_ids = video_ids[:recall_num]
  3271. #print(video_ids)
  3272. filter_ = FilterVideos(request_id=self.request_id,
  3273. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3274. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  3275. if filtered_viewed_videos is None:
  3276. return recall_result
  3277. #print("filtered_viewed_videos:", filtered_viewed_videos)
  3278. for vid in filtered_viewed_videos:
  3279. if vid in recall_dict:
  3280. recall_result.append(recall_dict[vid])
  3281. #print("u2i recall_result:", recall_result)
  3282. return recall_result
  3283. def get_U2U2I_reall(self, mid, exp_config=None):
  3284. #recall_key = "hot_video:"
  3285. if not mid:
  3286. return []
  3287. recall_key = "u2u2i:"+mid
  3288. #print("recall_key:", recall_key)
  3289. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3290. #print(data)
  3291. recall_result = []
  3292. recall_dict = {}
  3293. video_ids = []
  3294. if data is not None:
  3295. json_result = json.loads(data)
  3296. #print("json_result:", json_result)
  3297. for per_item in json_result:
  3298. try:
  3299. vid = int(per_item[0])
  3300. video_ids.append(vid)
  3301. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  3302. 'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2u2i_recall'],
  3303. 'abCode': self.ab_code}
  3304. except Exception as e:
  3305. continue
  3306. if len(video_ids)<=0:
  3307. return recall_result
  3308. recall_num = 20
  3309. try:
  3310. if exp_config and exp_config['recall_get_num']:
  3311. recall_num = int(exp_config['recall_get_num'])
  3312. except:
  3313. recall_num = 20
  3314. #print("recall_num:", recall_num)
  3315. video_ids = video_ids[:recall_num]
  3316. #print(video_ids)
  3317. filter_ = FilterVideos(request_id=self.request_id,
  3318. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3319. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  3320. if filtered_viewed_videos is None:
  3321. return recall_result
  3322. #print("filtered_viewed_videos:", filtered_viewed_videos)
  3323. for vid in filtered_viewed_videos:
  3324. if vid in recall_dict:
  3325. recall_result.append(recall_dict[vid])
  3326. #print("u2i recall_result:", recall_result)
  3327. return recall_result
  3328. def get_video_recall_config(self):
  3329. recall_key = "vr_exp_pos_config_range"
  3330. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3331. if data is not None and data!="":
  3332. try:
  3333. # print(data)
  3334. json_result = json.loads(data)
  3335. # print(json_result)
  3336. return json_result
  3337. except Exception as e:
  3338. return None
  3339. else:
  3340. return None
  3341. def get_return_video_reall(self, pre_key=None):
  3342. if self.video_id is None:
  3343. return []
  3344. recall_key = "rv:"+ str(self.video_id)
  3345. if pre_key :
  3346. recall_key = pre_key + str(self.video_id)
  3347. #print("recall_key:", recall_key)
  3348. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3349. #print(data)
  3350. recall_result = []
  3351. recall_dict = {}
  3352. video_ids = []
  3353. if data is not None and data!="" :
  3354. try:
  3355. json_result = json.loads(data)
  3356. for per_item in json_result:
  3357. vid = int(per_item[0])
  3358. video_ids.append(vid)
  3359. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  3360. 'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['return_video_recall'],
  3361. 'abCode': self.ab_code}
  3362. except Exception as e:
  3363. return recall_result
  3364. if len(video_ids)<=0:
  3365. return recall_result
  3366. recall_num = 20
  3367. #print("recall_num:", recall_num)
  3368. video_ids = video_ids[:recall_num]
  3369. #print(video_ids)
  3370. filter_ = FilterVideos(request_id=self.request_id,
  3371. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3372. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  3373. if filtered_viewed_videos is None:
  3374. return recall_result
  3375. #print("filtered_viewed_videos:", filtered_viewed_videos)
  3376. for vid in filtered_viewed_videos:
  3377. if vid in recall_dict:
  3378. recall_result.append(recall_dict[vid])
  3379. #print("u2i recall_result:", recall_result)
  3380. return recall_result
  3381. def get_play_reall(self, mid, exp_config=None):
  3382. #recall_key = "hot_video:"
  3383. if not mid:
  3384. return []
  3385. recall_key = "u2i_play:"+mid
  3386. #print("recall_key:", recall_key)
  3387. data = self.redis_helper.get_data_from_redis(key_name=recall_key)
  3388. #print(data)
  3389. recall_result = []
  3390. recall_dict = {}
  3391. video_ids = []
  3392. if data is not None and data!="":
  3393. try:
  3394. json_result = json.loads(data)
  3395. for per_item in json_result:
  3396. vid = int(per_item[0])
  3397. video_ids.append(vid)
  3398. recall_dict[vid] = {'videoId': vid, 'flowPool': '',
  3399. 'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2i_tag_play_recall'],
  3400. 'abCode': self.ab_code}
  3401. except Exception as e:
  3402. return recall_result
  3403. if len(video_ids)<=0:
  3404. return recall_result
  3405. recall_num = 20
  3406. try:
  3407. if exp_config and exp_config['recall_get_num']:
  3408. recall_num = int(exp_config['recall_get_num'])
  3409. except:
  3410. recall_num = 20
  3411. #print("recall_num:", recall_num)
  3412. video_ids = video_ids[:recall_num]
  3413. #print(video_ids)
  3414. filter_ = FilterVideos(request_id=self.request_id,
  3415. app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
  3416. filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
  3417. if filtered_viewed_videos is None:
  3418. return recall_result
  3419. #print("filtered_viewed_videos:", filtered_viewed_videos)
  3420. for vid in filtered_viewed_videos:
  3421. if vid in recall_dict:
  3422. recall_result.append(recall_dict[vid])
  3423. #print("u2i recall_result:", recall_result)
  3424. return recall_result