search_schedule.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import asyncio
  7. import time
  8. from applications.search import *
  9. from applications.static.config import gh_id_dict
  10. from applications.functions.log import logging
  11. from applications.functions.video_item import VideoProducer
  12. from applications.functions.async_etl import AsyncETL
  13. from applications.functions.mysql import select_sensitive_words
  14. from applications.functions.kimi import KimiServer
  15. class SearchABTest(object):
  16. """
  17. 搜索策略实验方案
  18. """
  19. ori_title = None
  20. article_summary = None
  21. article_keys = None
  22. gh_id = None
  23. trace_id = None
  24. def __init__(self, info, gh_id):
  25. SearchABTest.set_class_properties(info, gh_id)
  26. @classmethod
  27. def set_class_properties(cls, info, gh_id):
  28. """
  29. 初始化搜索策略实验类
  30. :param info: kimi 挖掘的基本信息
  31. :param gh_id: 公众号账号 id
  32. :return:
  33. """
  34. cls.ori_title = info["ori_title"]
  35. cls.article_summary = info["content_title"]
  36. cls.article_keys = info["content_keys"]
  37. cls.trace_id = info["trace_id"]
  38. cls.gh_id = gh_id
  39. @classmethod
  40. async def base_line(cls):
  41. """
  42. 兜底策略
  43. """
  44. result = await SearchMethod().search_v1(
  45. text=cls.article_keys[0],
  46. trace_id=cls.trace_id
  47. )
  48. if result:
  49. return result
  50. else:
  51. sub_result = await SearchMethod().search_v1(
  52. text=cls.article_keys[1],
  53. trace_id=cls.trace_id)
  54. if sub_result:
  55. return sub_result
  56. else:
  57. return await SearchMethod().search_v1(
  58. text=cls.article_keys[2],
  59. trace_id=cls.trace_id
  60. )
  61. @classmethod
  62. async def ab_0(cls):
  63. """
  64. 默认原标题搜索
  65. :return:
  66. """
  67. search_result = await SearchMethod().search_v1(
  68. text=cls.ori_title,
  69. trace_id=cls.trace_id
  70. )
  71. if search_result:
  72. return search_result
  73. else:
  74. return await cls.base_line()
  75. @classmethod
  76. async def ab_1(cls):
  77. """
  78. 使用 content_summary搜索
  79. :return:
  80. """
  81. search_result = await SearchMethod().search_v1(
  82. text=cls.article_summary,
  83. trace_id=cls.trace_id
  84. )
  85. if search_result:
  86. return search_result
  87. else:
  88. return await cls.ab_0()
  89. @classmethod
  90. async def ab_2(cls):
  91. """
  92. 使用文本关键词搜索
  93. :return:
  94. """
  95. search_result = await SearchMethod().search_v1(
  96. text=cls.article_keys[0],
  97. trace_id=cls.trace_id
  98. )
  99. if search_result:
  100. return search_result
  101. else:
  102. return await cls.base_line()
  103. @classmethod
  104. async def ab_3(cls):
  105. """
  106. 使用文本关键词搜索
  107. :return:
  108. """
  109. search_result = await SearchMethod().search_v1(
  110. text=cls.article_keys[1],
  111. trace_id=cls.trace_id
  112. )
  113. if search_result:
  114. return search_result
  115. else:
  116. return await cls.base_line()
  117. @classmethod
  118. async def ab_4(cls):
  119. """
  120. 使用文本关键词搜索
  121. :return:
  122. """
  123. search_result = await SearchMethod().search_v1(
  124. text=cls.article_keys[2],
  125. trace_id=cls.trace_id
  126. )
  127. if search_result:
  128. return search_result
  129. else:
  130. return await cls.base_line()
  131. class SearchMethod(object):
  132. """
  133. 搜索召回模式
  134. """
  135. s_words = select_sensitive_words()
  136. @classmethod
  137. async def search_v0(cls, text, trace_id):
  138. """
  139. 搜索顺序-wx --> baidu --> xigua
  140. 一共需要返回三条视频
  141. :return:
  142. """
  143. wx_result = []
  144. if wx_result:
  145. return {"platform": "wx_search", "result": wx_result[0]}
  146. else:
  147. logging(
  148. code="7001",
  149. info="通过微信搜索失败---{}".format(text),
  150. trace_id=trace_id,
  151. )
  152. # 微信搜不到的话,采用好看视频搜索
  153. time.sleep(1)
  154. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  155. if baidu_result:
  156. return {"platform": "baidu_search", "result": baidu_result[0]}
  157. else:
  158. # 若好看视频未搜到,则采用西瓜搜索
  159. logging(
  160. code="7001",
  161. info="通过baidu搜索失败---{}".format(text),
  162. trace_id=trace_id,
  163. )
  164. # return None
  165. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  166. if xigua_result:
  167. return {"platform": "xg_search", "result": xigua_result[0]}
  168. else:
  169. logging(
  170. code="7001",
  171. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  172. trace_id=trace_id,
  173. )
  174. return None
  175. @classmethod
  176. async def search_v1(cls, text, trace_id):
  177. """
  178. dy ---> baidu ---> xigua
  179. :param text:
  180. :param trace_id:
  181. :return:
  182. """
  183. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
  184. if douyin_result:
  185. return {"platform": "dy_search", "result": douyin_result[0]}
  186. else:
  187. logging(
  188. code="7001",
  189. info="抖音搜索失败--{}".format(text),
  190. trace_id=trace_id
  191. )
  192. time.sleep(1)
  193. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  194. if baidu_result:
  195. return {"platform": "baidu_search", "result": baidu_result[0]}
  196. else:
  197. # 若好看视频未搜到,则采用西瓜搜索
  198. logging(
  199. code="7001",
  200. info="通过baidu搜索失败---{}".format(text),
  201. trace_id=trace_id,
  202. )
  203. # return None
  204. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  205. if xigua_result:
  206. return {"platform": "xg_search", "result": xigua_result[0]}
  207. else:
  208. logging(
  209. code="7001",
  210. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  211. trace_id=trace_id,
  212. )
  213. return None
  214. async def video_sender(video_obj, user, trace_id, platform):
  215. """
  216. 异步处理微信 video_obj
  217. 公众号和站内账号一一对应
  218. :param platform:
  219. :param user:
  220. :param trace_id:
  221. :param video_obj:
  222. :return:
  223. """
  224. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  225. Video = VideoProducer()
  226. if platform == "xg_search":
  227. mq_obj = Video.xg_video_producer(
  228. video_obj=video_obj,
  229. user=user,
  230. trace_id=trace_id,
  231. )
  232. elif platform == "baidu_search":
  233. mq_obj = Video.baidu_video_producer(
  234. video_obj=video_obj,
  235. user=user,
  236. trace_id=trace_id,
  237. )
  238. elif platform == "wx_search":
  239. mq_obj = Video.wx_video_producer(
  240. video_obj=video_obj,
  241. user=user,
  242. trace_id=trace_id,
  243. )
  244. elif platform == "dy_search":
  245. mq_obj = Video.dy_video_producer(
  246. video_obj=video_obj,
  247. user=user,
  248. trace_id=trace_id,
  249. )
  250. else:
  251. mq_obj = {}
  252. AE = AsyncETL(video_obj=mq_obj)
  253. video_id = await AE.etl_deal()
  254. logging(
  255. code="6002",
  256. info="视频下载完成",
  257. data=mq_obj,
  258. trace_id=trace_id
  259. )
  260. return video_id
  261. async def search_videos(params, trace_id, gh_id, mysql_client):
  262. """
  263. search and send msg to ETL
  264. :param mysql_client:
  265. :param params:
  266. :param gh_id: 通过账号 id 来控制实验策略
  267. :param trace_id:
  268. :return:
  269. """
  270. K = KimiServer()
  271. kimi_info = await K.search_kimi_schedule(params=params)
  272. print("{}---kimi 挖掘正常".format(trace_id))
  273. kimi_title = kimi_info['k_title']
  274. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  275. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  276. update_kimi_sql = f"""
  277. UPDATE long_articles_video SET
  278. kimi_title = '{kimi_title}',
  279. kimi_summary = '{content_title}',
  280. kimi_keys = '{content_keys}'
  281. WHERE trace_id = '{trace_id}';
  282. """
  283. await mysql_client.async_insert(update_kimi_sql)
  284. kimi_info["trace_id"] = trace_id
  285. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  286. recall_obj_1 = await SearchAB.ab_1()
  287. # recall_obj_1 = await SearchAB.ab_0()
  288. await asyncio.sleep(3)
  289. recall_obj_2 = await SearchAB.ab_2()
  290. await asyncio.sleep(3)
  291. recall_obj_3 = await SearchAB.ab_3()
  292. print("{}---视频搜索正常".format(trace_id))
  293. recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
  294. un_empty_list = [i for i in recall_list if i]
  295. if len(un_empty_list) < 3:
  296. await asyncio.sleep(3)
  297. recall_obj_4 = await SearchAB.ab_4()
  298. if recall_obj_4:
  299. un_empty_list.append(recall_obj_4)
  300. # 逐条下载,逐条写表
  301. if un_empty_list:
  302. for index, recall_obj in enumerate(un_empty_list, 1):
  303. platform = recall_obj["platform"]
  304. recall_video = recall_obj["result"]
  305. if recall_video:
  306. logging(
  307. code="7002",
  308. info="视频搜索成功, 搜索平台为--{}".format(platform),
  309. trace_id=trace_id,
  310. data=recall_video,
  311. )
  312. video_id = await video_sender(
  313. video_obj=recall_video,
  314. user=gh_id_dict.get(gh_id),
  315. trace_id=trace_id,
  316. platform=platform,
  317. )
  318. update_id_sql = f"""
  319. UPDATE long_articles_video
  320. SET
  321. recall_video_id{index} = {video_id}
  322. WHERE
  323. trace_id = '{trace_id}'
  324. """
  325. await mysql_client.async_insert(update_id_sql)
  326. else:
  327. logging(
  328. code="7003",
  329. info="视频搜索失败, 被敏感词过滤",
  330. trace_id=trace_id
  331. )
  332. async def re_search_videos(params, trace_id, gh_id, mysql_client):
  333. """
  334. 重新搜索接口
  335. :param params:
  336. :param trace_id:
  337. :param gh_id:
  338. :param mysql_client:
  339. :return:
  340. cls.ori_title = info["ori_title"]
  341. cls.article_summary = info["content_title"]
  342. cls.article_keys = info["content_keys"]
  343. cls.trace_id = info["trace_id"]
  344. """
  345. obj = {
  346. "ori_title": params['ori_title'],
  347. "content_title": params['kimi_summary'],
  348. "content_keys": params['kimi_keys'],
  349. "trace_id": params['trace_id']
  350. }
  351. SearchAB = SearchABTest(info=obj, gh_id=gh_id)
  352. recall_obj_1 = await SearchAB.ab_1()
  353. # recall_obj_1 = await SearchAB.ab_0()
  354. await asyncio.sleep(3)
  355. recall_obj_2 = await SearchAB.ab_2()
  356. await asyncio.sleep(3)
  357. recall_obj_3 = await SearchAB.ab_3()
  358. print("{}---视频搜索正常".format(trace_id))
  359. recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
  360. un_empty_list = [i for i in recall_list if i]
  361. if len(un_empty_list) < 3:
  362. await asyncio.sleep(3)
  363. recall_obj_4 = await SearchAB.ab_4()
  364. if recall_obj_4:
  365. un_empty_list.append(recall_obj_4)
  366. # 逐条下载,逐条写表
  367. if un_empty_list:
  368. for index, recall_obj in enumerate(un_empty_list, 1):
  369. platform = recall_obj["platform"]
  370. recall_video = recall_obj["result"]
  371. if recall_video:
  372. logging(
  373. code="7002",
  374. info="视频搜索成功, 搜索平台为--{}".format(platform),
  375. trace_id=trace_id,
  376. data=recall_video,
  377. )
  378. video_id = await video_sender(
  379. video_obj=recall_video,
  380. user=gh_id_dict.get(gh_id),
  381. trace_id=trace_id,
  382. platform=platform,
  383. )
  384. update_id_sql = f"""
  385. UPDATE long_articles_video
  386. SET
  387. recall_video_id{index} = {video_id}
  388. WHERE
  389. trace_id = '{trace_id}'
  390. """
  391. await mysql_client.async_insert(update_id_sql)
  392. else:
  393. logging(
  394. code="7003",
  395. info="视频搜索失败, 被敏感词过滤",
  396. trace_id=trace_id
  397. )