search_schedule.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. from applications.match_algorithm.rank import title_similarity_rank
  8. from applications.search import *
  9. from applications.static.config import gh_id_dict, db_article
  10. from applications.functions.log import logging
  11. from applications.functions.video_item import VideoProducer
  12. from applications.functions.kimi import KimiServer
  13. from applications.functions.common import request_etl
  14. class SearchABTest(object):
  15. """
  16. 搜索策略实验方案
  17. """
  18. ori_title = None
  19. article_summary = None
  20. article_keys = None
  21. gh_id = None
  22. trace_id = None
  23. def __init__(self, info, gh_id):
  24. SearchABTest.set_class_properties(info, gh_id)
  25. @classmethod
  26. def set_class_properties(cls, info, gh_id):
  27. """
  28. 初始化搜索策略实验类
  29. :param info: kimi 挖掘的基本信息
  30. :param gh_id: 公众号账号 id
  31. :return:
  32. """
  33. cls.ori_title = info["ori_title"]
  34. cls.article_summary = info["content_title"]
  35. cls.article_keys = info["content_keys"]
  36. cls.trace_id = info["trace_id"]
  37. cls.gh_id = gh_id
  38. @classmethod
  39. async def base_line(cls):
  40. """
  41. 兜底策略
  42. """
  43. result = await SearchMethod().search_v1(
  44. text=cls.article_keys[0],
  45. trace_id=cls.trace_id
  46. )
  47. if result:
  48. return result
  49. else:
  50. sub_result = await SearchMethod().search_v1(
  51. text=cls.article_keys[1],
  52. trace_id=cls.trace_id)
  53. if sub_result:
  54. return sub_result
  55. else:
  56. return await SearchMethod().search_v1(
  57. text=cls.article_keys[2],
  58. trace_id=cls.trace_id
  59. )
  60. @classmethod
  61. async def ab_0(cls):
  62. """
  63. 默认原标题搜索
  64. :return:
  65. """
  66. search_result = await SearchMethod().search_v1(
  67. text=cls.ori_title,
  68. trace_id=cls.trace_id
  69. )
  70. if search_result:
  71. return search_result
  72. else:
  73. return await cls.base_line()
  74. @classmethod
  75. async def ab_1(cls):
  76. """
  77. 使用 content_summary搜索
  78. :return:
  79. """
  80. search_result = await SearchMethod().search_v1(
  81. text=cls.article_summary,
  82. trace_id=cls.trace_id
  83. )
  84. if search_result:
  85. return search_result
  86. else:
  87. return await cls.ab_0()
  88. @classmethod
  89. async def ab_2(cls):
  90. """
  91. 使用文本关键词搜索
  92. :return:
  93. """
  94. search_result = await SearchMethod().search_v1(
  95. text=cls.article_keys[0],
  96. trace_id=cls.trace_id
  97. )
  98. if search_result:
  99. return search_result
  100. else:
  101. return await cls.base_line()
  102. @classmethod
  103. async def ab_3(cls):
  104. """
  105. 使用文本关键词搜索
  106. :return:
  107. """
  108. search_result = await SearchMethod().search_v1(
  109. text=cls.article_keys[1],
  110. trace_id=cls.trace_id
  111. )
  112. if search_result:
  113. return search_result
  114. else:
  115. return await cls.base_line()
  116. @classmethod
  117. async def ab_4(cls):
  118. """
  119. 使用文本关键词搜索
  120. :return:
  121. """
  122. search_result = await SearchMethod().search_v1(
  123. text=cls.article_keys[2],
  124. trace_id=cls.trace_id
  125. )
  126. if search_result:
  127. return search_result
  128. else:
  129. return await cls.base_line()
  130. @classmethod
  131. async def ab_5(cls):
  132. """
  133. 增量搜索, 返回result_list
  134. :return:
  135. """
  136. result_list = await SearchMethod().search_v2(
  137. text=cls.article_summary[:15],
  138. trace_id=cls.trace_id
  139. )
  140. if len(result_list) > 3:
  141. return result_list
  142. else:
  143. result_list += await SearchMethod().search_v2(
  144. text=cls.ori_title[:15],
  145. trace_id=cls.trace_id
  146. )
  147. if len(result_list) > 3:
  148. return result_list
  149. else:
  150. result_list += await SearchMethod().search_v2(
  151. text=cls.article_keys[0],
  152. trace_id=cls.trace_id
  153. )
  154. if len(result_list) > 3:
  155. return result_list
  156. else:
  157. result_list += await SearchMethod().search_v2(
  158. text=cls.article_keys[1],
  159. trace_id=cls.trace_id
  160. )
  161. if result_list:
  162. return result_list
  163. else:
  164. result_list += await SearchMethod().search_v2(
  165. text=cls.article_keys[2],
  166. trace_id=cls.trace_id
  167. )
  168. return result_list
  169. class SearchMethod(object):
  170. """
  171. 搜索召回模式
  172. """
  173. s_words = []
  174. @classmethod
  175. async def search_v1(cls, text, trace_id):
  176. """
  177. dy ---> baidu ---> xigua
  178. :param text:
  179. :param trace_id:
  180. :return:
  181. """
  182. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
  183. if douyin_result:
  184. return {"platform": "dy_search", "result": douyin_result[0]}
  185. else:
  186. time.sleep(1)
  187. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
  188. if baidu_result:
  189. return {"platform": "baidu_search", "result": baidu_result[0]}
  190. else:
  191. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  192. if xigua_result:
  193. return {"platform": "xg_search", "result": xigua_result[0]}
  194. else:
  195. return None
  196. @classmethod
  197. async def search_v2(cls, text, trace_id):
  198. """
  199. dy ---> baidu ---> xigua
  200. :param trace_id:
  201. :param text:
  202. :return:
  203. """
  204. L = []
  205. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
  206. for vid_obj in douyin_result:
  207. L.append({"platform": "dy_search", "result": vid_obj})
  208. if len(L) >= 3:
  209. return L
  210. else:
  211. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
  212. if baidu_result:
  213. L.append({"platform": "baidu_search", "result": baidu_result[0]})
  214. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  215. if xigua_result:
  216. L.append({"platform": "xg_search", "result": xigua_result[0]})
  217. return L
  218. async def video_sender(video_obj, user, trace_id, platform, index):
  219. """
  220. 异步处理微信 video_obj
  221. 公众号和站内账号一一对应
  222. :param index:
  223. :param platform:
  224. :param user:
  225. :param trace_id:
  226. :param video_obj:
  227. :return:
  228. """
  229. Video = VideoProducer()
  230. if platform == "xg_search":
  231. mq_obj = Video.xg_video_producer(
  232. video_obj=video_obj,
  233. user=user,
  234. trace_id=trace_id,
  235. )
  236. elif platform == "baidu_search":
  237. mq_obj = Video.baidu_video_producer(
  238. video_obj=video_obj,
  239. user=user,
  240. trace_id=trace_id,
  241. )
  242. elif platform == "wx_search":
  243. mq_obj = Video.wx_video_producer(
  244. video_obj=video_obj,
  245. user=user,
  246. trace_id=trace_id,
  247. )
  248. elif platform == "dy_search":
  249. mq_obj = Video.dy_video_producer(
  250. video_obj=video_obj,
  251. user=user,
  252. trace_id=trace_id,
  253. )
  254. else:
  255. mq_obj = {}
  256. mq_obj['index'] = index
  257. mq_obj['trace_id'] = trace_id
  258. header = {
  259. "Content-Type": "application/json",
  260. }
  261. await request_etl(
  262. url="http://192.168.203.137:4612/etl",
  263. headers=header,
  264. json_data=mq_obj
  265. )
  266. # await request_etl(
  267. # url="http://localhost:4612/etl",
  268. # headers=header,
  269. # json_data=mq_obj
  270. # )
  271. async def search_videos(params, trace_id, gh_id, mysql_client):
  272. """
  273. search and send msg to ETL
  274. :param mysql_client:
  275. :param params:
  276. :param gh_id: 通过账号 id 来控制实验策略
  277. :param trace_id:
  278. :return:
  279. """
  280. K = KimiServer()
  281. kimi_info = await K.search_kimi_schedule(params=params)
  282. kimi_title = kimi_info['k_title']
  283. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  284. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  285. update_kimi_sql = f"""
  286. UPDATE {db_article} SET
  287. kimi_title = %s,
  288. kimi_summary = %s,
  289. kimi_keys = %s
  290. WHERE trace_id = %s;
  291. """
  292. await mysql_client.async_insert(
  293. sql=update_kimi_sql,
  294. params=(kimi_title, content_title, content_keys, trace_id)
  295. )
  296. kimi_info["trace_id"] = trace_id
  297. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  298. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  299. recall_list = await SearchAB.ab_5()
  300. logging(
  301. code="1006",
  302. info="搜索到{}条视频".format(len(recall_list)),
  303. data=recall_list,
  304. trace_id=trace_id
  305. )
  306. print(json.dumps(recall_list, ensure_ascii=False, indent=4))
  307. # 按照标题相似度排序
  308. ranked_list = title_similarity_rank(content_title=content_title, recall_list=recall_list)
  309. print(json.dumps(ranked_list, ensure_ascii=False, indent=4))
  310. # index = 0
  311. # for recall_obj in ranked_list:
  312. # if recall_obj:
  313. # platform = recall_obj['platform']
  314. # recall_video = recall_obj['result']
  315. # if recall_video:
  316. # index += 1
  317. # await video_sender(
  318. # video_obj=recall_video,
  319. # user=gh_id_dict.get(gh_id),
  320. # trace_id=trace_id,
  321. # platform=platform,
  322. # index=index
  323. # )
  324. # logging(
  325. # code="1007",
  326. # info="成功请求etl",
  327. # data=recall_video,
  328. # trace_id=trace_id
  329. # )
  330. # if index >= 3:
  331. # print("already downloaded 3 videos")
  332. # logging(
  333. # code="1008",
  334. # info="成功下载三条视频",
  335. # trace_id=trace_id
  336. # )
  337. # break
  338. async def re_search_videos(params, trace_id, gh_id):
  339. """
  340. 重新搜索接口
  341. :param params:
  342. :param trace_id:
  343. :param gh_id:
  344. :return:
  345. """
  346. try:
  347. obj = {
  348. "ori_title": params['title'],
  349. "content_title": params['kimi_summary'],
  350. "content_keys": json.loads(params['kimi_keys']),
  351. "trace_id": params['trace_id']
  352. }
  353. except:
  354. obj = {
  355. "ori_title": params['title'],
  356. "content_title": params['kimi_summary'],
  357. "content_keys": params['kimi_keys'],
  358. "trace_id": params['trace_id']
  359. }
  360. SearchAB = SearchABTest(info=obj, gh_id=gh_id)
  361. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  362. recall_list = await SearchAB.ab_5()
  363. print("一共搜索到{}条视频".format(len(recall_list)))
  364. index = 0
  365. for recall_obj in recall_list:
  366. if recall_obj:
  367. platform = recall_obj['platform']
  368. recall_video = recall_obj['result']
  369. if recall_video:
  370. index += 1
  371. await video_sender(
  372. video_obj=recall_video,
  373. user=gh_id_dict.get(gh_id),
  374. trace_id=trace_id,
  375. platform=platform,
  376. index=index
  377. )
  378. logging(
  379. code="7004",
  380. info="成功请求etl",
  381. trace_id=trace_id
  382. )
  383. if index >= 3:
  384. print("already downloaded 3 videos")
  385. break
  386. print("一个匹配到{}条".format(index))