search_schedule.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import asyncio
  7. from applications.search import *
  8. from applications.static.config import gh_id_dict
  9. from applications.functions.log import logging
  10. from applications.functions.video_item import VideoProducer
  11. from applications.functions.async_etl import AsyncETL
  12. from applications.functions.mysql import select_sensitive_words
  13. from applications.functions.kimi import KimiServer
  14. class SearchABTest(object):
  15. """
  16. 搜索策略实验方案
  17. """
  18. ori_title = None
  19. article_summary = None
  20. article_keys = None
  21. gh_id = None
  22. trace_id = None
  23. def __init__(self, info, gh_id):
  24. SearchABTest.set_class_properties(info, gh_id)
  25. @classmethod
  26. def set_class_properties(cls, info, gh_id):
  27. """
  28. 初始化搜索策略实验类
  29. :param info: kimi 挖掘的基本信息
  30. :param gh_id: 公众号账号 id
  31. :return:
  32. """
  33. cls.ori_title = info["ori_title"]
  34. cls.article_summary = info["content_title"]
  35. cls.article_keys = info["content_keys"]
  36. cls.trace_id = info["trace_id"]
  37. cls.gh_id = gh_id
  38. @classmethod
  39. async def base_line(cls):
  40. """
  41. 兜底策略
  42. """
  43. result = await SearchMethod().search_v0(
  44. text=cls.article_keys[0],
  45. trace_id=cls.trace_id
  46. )
  47. if result:
  48. return result
  49. else:
  50. sub_result = await SearchMethod().search_v0(
  51. text=cls.article_keys[1],
  52. trace_id=cls.trace_id)
  53. if sub_result:
  54. return sub_result
  55. else:
  56. return await SearchMethod().search_v0(
  57. text=cls.article_keys[2],
  58. trace_id=cls.trace_id
  59. )
  60. @classmethod
  61. async def ab_0(cls):
  62. """
  63. 默认原标题搜索
  64. :return:
  65. """
  66. search_result = await SearchMethod().search_v0(
  67. text=cls.ori_title,
  68. trace_id=cls.trace_id
  69. )
  70. if search_result:
  71. return search_result
  72. else:
  73. return await cls.base_line()
  74. @classmethod
  75. async def ab_1(cls):
  76. """
  77. 使用 content_summary搜索
  78. :return:
  79. """
  80. search_result = await SearchMethod().search_v0(
  81. text=cls.article_summary,
  82. trace_id=cls.trace_id
  83. )
  84. if search_result:
  85. return search_result
  86. else:
  87. return await cls.ab_0()
  88. @classmethod
  89. async def ab_2(cls):
  90. """
  91. 使用文本关键词搜索
  92. :return:
  93. """
  94. search_result = await SearchMethod().search_v0(
  95. text=",".join(cls.article_keys[0]),
  96. trace_id=cls.trace_id
  97. )
  98. if search_result:
  99. return search_result
  100. else:
  101. return await cls.base_line()
  102. @classmethod
  103. async def ab_3(cls):
  104. """
  105. 使用文本关键词搜索
  106. :return:
  107. """
  108. search_result = await SearchMethod().search_v0(
  109. text=",".join(cls.article_keys[1]),
  110. trace_id=cls.trace_id
  111. )
  112. if search_result:
  113. return search_result
  114. else:
  115. return await cls.base_line()
  116. @classmethod
  117. async def ab_4(cls):
  118. """
  119. 使用文本关键词搜索
  120. :return:
  121. """
  122. search_result = await SearchMethod().search_v0(
  123. text=",".join(cls.article_keys[3]),
  124. trace_id=cls.trace_id
  125. )
  126. if search_result:
  127. return search_result
  128. else:
  129. return await cls.base_line()
  130. class SearchMethod(object):
  131. """
  132. 搜索召回模式
  133. """
  134. s_words = select_sensitive_words()
  135. @classmethod
  136. async def search_v0(cls, text, trace_id):
  137. """
  138. 搜索顺序-wx --> baidu --> xigua
  139. 一共需要返回三条视频
  140. :return:
  141. """
  142. wx_result = wx_search(keys=text, sensitive_words=cls.s_words)
  143. if wx_result:
  144. return {"platform": "wx_search", "result": wx_result[0]}
  145. else:
  146. logging(
  147. code="7001",
  148. info="通过微信搜索失败---{}".format(text),
  149. trace_id=trace_id,
  150. )
  151. # 微信搜不到的话,采用好看视频搜索
  152. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  153. if baidu_result:
  154. return {"platform": "baidu_search", "result": baidu_result[0]}
  155. else:
  156. # 若好看视频未搜到,则采用西瓜搜索
  157. logging(
  158. code="7001",
  159. info="通过baidu搜索失败---{}".format(text),
  160. trace_id=trace_id,
  161. )
  162. return None
  163. # xigua_result = xigua_search(keyword=text, sensitive_words=cls.s_words)
  164. # if xigua_result:
  165. # return {"platform": "xg_search", "result": xigua_result[0]}
  166. # else:
  167. # logging(
  168. # code="7001",
  169. # info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  170. # trace_id=trace_id,
  171. # )
  172. # return None
  173. async def video_sender(video_obj, user, trace_id, platform):
  174. """
  175. 异步处理微信 video_obj
  176. 公众号和站内账号一一对应
  177. :param platform:
  178. :param user:
  179. :param trace_id:
  180. :param video_obj:
  181. :return:
  182. """
  183. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  184. Video = VideoProducer()
  185. if platform == "xg_search":
  186. mq_obj = Video.xg_video_producer(
  187. video_obj=video_obj,
  188. user=user,
  189. trace_id=trace_id,
  190. )
  191. elif platform == "baidu_search":
  192. mq_obj = Video.baidu_video_producer(
  193. video_obj=video_obj,
  194. user=user,
  195. trace_id=trace_id,
  196. )
  197. elif platform == "wx_search":
  198. mq_obj = Video.wx_video_producer(
  199. video_obj=video_obj,
  200. user=user,
  201. trace_id=trace_id,
  202. )
  203. else:
  204. mq_obj = {}
  205. AE = AsyncETL(video_obj=mq_obj)
  206. video_id = await AE.etl_deal()
  207. logging(
  208. code="6002",
  209. info="视频下载完成",
  210. data=mq_obj,
  211. trace_id=trace_id
  212. )
  213. return video_id
  214. async def search_videos(params, trace_id, gh_id, mysql_client):
  215. """
  216. search and send msg to ETL
  217. :param mysql_client:
  218. :param params:
  219. :param gh_id: 通过账号 id 来控制实验策略
  220. :param trace_id:
  221. :return:
  222. """
  223. K = KimiServer()
  224. try:
  225. kimi_info = await K.search_kimi_schedule(params=params)
  226. print(json.dumps(kimi_info, ensure_ascii=False, indent=4))
  227. kimi_title = kimi_info['k_title']
  228. content_title = kimi_info['content_title']
  229. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  230. update_kimi_sql = f"""
  231. UPDATE long_articles_video_dev SET
  232. kimi_title = '{kimi_title}',
  233. kimi_summary = '{content_title}',
  234. kimi_keys = '{content_keys}'
  235. WHERE trace_id = '{trace_id}';
  236. """
  237. await mysql_client.async_insert(update_kimi_sql)
  238. kimi_info["trace_id"] = trace_id
  239. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  240. recall_obj_1 = await SearchAB.ab_1()
  241. await asyncio.sleep(3)
  242. recall_obj_2 = await SearchAB.ab_2()
  243. await asyncio.sleep(3)
  244. recall_obj_3 = await SearchAB.ab_3()
  245. recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
  246. un_empty_list = [i for i in recall_list if i]
  247. if len(un_empty_list) < 3:
  248. await asyncio.sleep(3)
  249. recall_obj_4 = await SearchAB.ab_4()
  250. un_empty_list.append(recall_obj_4)
  251. # 逐条下载,逐条写表
  252. if un_empty_list:
  253. for index, recall_obj in enumerate(un_empty_list, 1):
  254. platform = recall_obj["platform"]
  255. recall_video = recall_obj["result"]
  256. if recall_video:
  257. logging(
  258. code="7002",
  259. info="视频搜索成功, 搜索平台为--{}".format(platform),
  260. trace_id=trace_id,
  261. data=recall_video,
  262. )
  263. video_id = await video_sender(
  264. video_obj=recall_video,
  265. user=gh_id_dict.get(gh_id),
  266. trace_id=trace_id,
  267. platform=platform,
  268. )
  269. update_id_sql = f"""
  270. UPDATE long_articles_video_dev
  271. SET
  272. recall_video_id{index} = {video_id}
  273. WHERE
  274. trace_id = '{trace_id}'
  275. """
  276. print(update_id_sql)
  277. await mysql_client.async_insert(update_id_sql)
  278. else:
  279. logging(code="7003", info="视频搜索失败", trace_id=trace_id)
  280. return None
  281. except Exception as e:
  282. logging(
  283. code="9000",
  284. info="kimi挖掘失败, 原因是-{}".format(e),
  285. trace_id=trace_id
  286. )
  287. return None