search.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. import requests
  8. from applications.mq import MQ
  9. from applications.log import logging
  10. from applications.config import gh_id_dict
  11. from applications.functions.item import VideoItem
  12. from applications.functions.common import sensitive_flag
  13. def wx_search(keys):
  14. """
  15. WeChat search
  16. :param keys:
  17. :return:
  18. """
  19. url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
  20. payload = json.dumps({
  21. "keyword": keys,
  22. "cursor": "0",
  23. "content_type": "video"
  24. })
  25. headers = {
  26. 'Content-Type': 'application/json'
  27. }
  28. response = requests.request("POST", url, headers=headers, data=payload)
  29. return response.json()
  30. def process_weixin_video_obj(video_obj, user, trace_id, title):
  31. """
  32. 异步处理微信 video_obj
  33. 公众号和站内账号一一对应
  34. :param title:
  35. :param trace_id:
  36. :param user:
  37. :param video_obj:
  38. :return:
  39. """
  40. ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  41. platform = "weixin_search"
  42. publish_time_stamp = int(video_obj['pubTime'])
  43. item = VideoItem()
  44. item.add_video_info("user_id", user["uid"])
  45. item.add_video_info("user_name", user["nick_name"])
  46. item.add_video_info("video_id", video_obj['hashDocID'])
  47. item.add_video_info("video_title", title)
  48. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  49. item.add_video_info("video_url", video_obj["videoUrl"])
  50. item.add_video_info("cover_url", video_obj["image"])
  51. item.add_video_info("out_video_id", video_obj['hashDocID'])
  52. item.add_video_info("out_user_id", trace_id)
  53. item.add_video_info("platform", platform)
  54. item.add_video_info("strategy", "search")
  55. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  56. mq_obj = item.produce_item()
  57. ETL_MQ.send_msg(params=mq_obj)
  58. logging(
  59. code="6002",
  60. info="发送消息至 ETL",
  61. data=mq_obj
  62. )
  63. def return_video(video_path, title, trace_id):
  64. """
  65. search and send msg to ETL
  66. :param trace_id:
  67. :param title: 视频标题
  68. :param video_path: 视频路径
  69. :return:
  70. """
  71. with open(video_path, encoding='utf-8') as f:
  72. my_obj = json.loads(f.read())
  73. if my_obj:
  74. # 三者都搜索,优先搜索 title
  75. title_result = wx_search(keys=title)
  76. if title_result['msg'] == '未知错误':
  77. logging(
  78. code="7001",
  79. info="通过标题搜索失败---{}".format(title),
  80. trace_id=trace_id
  81. )
  82. else:
  83. obj_list = title_result['data']['data']
  84. for obj in obj_list:
  85. try:
  86. title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  87. '').replace("#",
  88. "")
  89. if sensitive_flag(title):
  90. return obj
  91. else:
  92. continue
  93. except Exception as e:
  94. print(e)
  95. continue
  96. # search_keys
  97. search_keys_result = wx_search(keys=my_obj['search_keys'][0])
  98. if search_keys_result['msg'] == '未知错误':
  99. logging(
  100. code="7001",
  101. info="通过搜索词搜索失败---{}".format(title),
  102. trace_id=trace_id
  103. )
  104. else:
  105. obj_list = search_keys_result['data']['data']
  106. for obj in obj_list:
  107. try:
  108. title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  109. '').replace("#",
  110. "")
  111. if sensitive_flag(title):
  112. return obj
  113. else:
  114. continue
  115. except Exception as e:
  116. print(e)
  117. continue
  118. # theme
  119. theme_result = wx_search(keys=my_obj['theme'])
  120. if theme_result['msg'] == '未知错误':
  121. logging(
  122. code="7001",
  123. info="通过主题搜索失败---{}".format(title),
  124. trace_id=trace_id
  125. )
  126. else:
  127. obj_list = theme_result['data']['data']
  128. for obj in obj_list:
  129. try:
  130. title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  131. '').replace("#",
  132. "")
  133. if sensitive_flag(title):
  134. return obj
  135. else:
  136. continue
  137. except Exception as e:
  138. print(e)
  139. continue
  140. return None
  141. else:
  142. logging(
  143. code="7000",
  144. info="标题--{}--kimi 挖掘数据失败".format(title),
  145. trace_id=trace_id
  146. )
  147. return None
  148. def search_videos(video_path, title, trace_id, gh_id):
  149. """
  150. search and send msg to ETL
  151. :param gh_id:
  152. :param video_path:
  153. :param title:
  154. :param trace_id:
  155. :return:
  156. """
  157. video_obj = return_video(video_path, title, trace_id)
  158. if video_obj:
  159. logging(
  160. code="7002",
  161. info="视频搜索成功",
  162. trace_id=trace_id,
  163. data=video_obj
  164. )
  165. title = video_obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>', '').replace("#",
  166. "")
  167. process_weixin_video_obj(
  168. video_obj=video_obj['items'][0],
  169. user=gh_id_dict.get(gh_id),
  170. trace_id=trace_id,
  171. title=title
  172. )
  173. else:
  174. logging(
  175. code="7003",
  176. info="视频搜索失败",
  177. trace_id=trace_id
  178. )