video_item.py 8.3 KB


  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. from applications.functions.mq import MQ
  6. from applications.functions.log import logging
  7. from applications.functions.common import Functions
  8. class VideoItem(object):
  9. """
  10. function: 当扫描进一条视频的时候,对该视频的基本信息进行处理,保证发送给 pipeline和 etl 的 video_dict 是正确的
  11. __init__: 初始化空json 对象,用来存储视频信息
  12. add_video_info: 把视频信息存储到 item 对象中
  13. check_item: 检查 item 对象中的各个元素以及处理
  14. """
  15. def __init__(self):
  16. self.item = {}
  17. def add_video_info(self, key, value):
  18. """
  19. insert or update video info
  20. :param key:
  21. :param value:
  22. """
  23. self.item[key] = value
  24. def check_item(self):
  25. """
  26. 判断item 里面的字段,是否符合要求
  27. 字段分为 3 类:
  28. 1. 必须存在数据的字段: ["video_id", "user_id", "user_name", "out_user_id", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy"]
  29. 2. 不存在默认为 0 的字段 :["duration", "play_cnt", "like_cnt", "comment_cnt", "share_cnt", "width", "height"]
  30. 3. 需要后出理的字段: video_title, publish_time
  31. """
  32. if self.item.get("video_title"):
  33. self.item["video_title"] = Functions().clean_title(self.item["video_title"])
  34. else:
  35. return False
  36. if self.item.get("publish_time_stamp"):
  37. publish_time_str = time.strftime(
  38. "%Y-%m-%d %H:%M:%S", time.localtime(self.item["publish_time_stamp"])
  39. )
  40. self.add_video_info("publish_time_str", publish_time_str)
  41. else:
  42. publish_time_stamp = int(time.time())
  43. publish_time_str = time.strftime(
  44. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  45. )
  46. self.add_video_info("publish_time_stamp", publish_time_stamp)
  47. self.add_video_info("publish_time_str", publish_time_str)
  48. self.add_video_info("publish_time", publish_time_str)
  49. if not self.item.get("update_time_stamp"):
  50. self.add_video_info("update_time_stamp", int(time.time()))
  51. # 如果不存在,默认值为 0
  52. config_keys = [
  53. "duration",
  54. "play_cnt",
  55. "like_cnt",
  56. "comment_cnt",
  57. "share_cnt",
  58. "width",
  59. "height",
  60. ]
  61. for config_key in config_keys:
  62. if self.item.get(config_key):
  63. continue
  64. else:
  65. self.add_video_info(config_key, 0)
  66. # 必须存在的元素,若不存在则会报错
  67. must_keys = [
  68. "video_id",
  69. "user_id",
  70. "user_name",
  71. "out_video_id",
  72. "session",
  73. "video_url",
  74. "cover_url",
  75. "platform",
  76. "strategy",
  77. ]
  78. """
  79. video_id, out_video_id 均为站外视频 id
  80. usr_id: 站内用户 id
  81. out_user_id: 站外用户 id
  82. user_name: 站外用户名称
  83. """
  84. for m_key in must_keys:
  85. if self.item.get(m_key):
  86. continue
  87. else:
  88. # print(m_key)
  89. return False
  90. return True
  91. def produce_item(self):
  92. """
  93. item producer
  94. :return:
  95. """
  96. flag = self.check_item()
  97. if flag:
  98. return self.item
  99. else:
  100. return False
  101. class VideoProducer(object):
  102. """
  103. 处理视频
  104. """
  105. @classmethod
  106. def wx_video_producer(cls, video_obj, user, trace_id):
  107. """
  108. 异步处理微信 video_obj
  109. 公众号和站内账号一一对应
  110. :param trace_id:
  111. :param user:
  112. :param video_obj:
  113. :return:
  114. """
  115. platform = "weixin_search"
  116. publish_time_stamp = int(video_obj['pubTime'])
  117. item = VideoItem()
  118. item.add_video_info("user_id", user["uid"])
  119. item.add_video_info("user_name", user["nick_name"])
  120. item.add_video_info("video_id", video_obj['hashDocID'])
  121. item.add_video_info("video_title", trace_id)
  122. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  123. item.add_video_info("video_url", video_obj["videoUrl"])
  124. item.add_video_info("cover_url", video_obj["image"])
  125. item.add_video_info("out_video_id", video_obj['hashDocID'])
  126. item.add_video_info("out_user_id", trace_id)
  127. item.add_video_info("platform", platform)
  128. item.add_video_info("strategy", "search")
  129. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  130. mq_obj = item.produce_item()
  131. return mq_obj
  132. @classmethod
  133. def baidu_video_producer(cls, video_obj, user, trace_id):
  134. """
  135. 处理好看视频的 video_info
  136. :param video_obj:
  137. :param user:
  138. :param trace_id:
  139. :return:
  140. """
  141. platform = "baidu_search"
  142. publish_time_stamp = int(video_obj['publish_time'])
  143. item = VideoItem()
  144. item.add_video_info("user_id", user["uid"])
  145. item.add_video_info("user_name", user["nick_name"])
  146. item.add_video_info("video_id", video_obj['id'])
  147. item.add_video_info("video_title", trace_id)
  148. item.add_video_info("publish_time_stamp", publish_time_stamp)
  149. item.add_video_info("video_url", video_obj["playurl"])
  150. item.add_video_info("cover_url", video_obj["poster"])
  151. item.add_video_info("out_video_id", video_obj['id'])
  152. item.add_video_info("out_user_id", trace_id)
  153. item.add_video_info("like_cnt", video_obj['like'] if video_obj.get('like') else 0)
  154. item.add_video_info("play_cnt", video_obj['playcnt'])
  155. item.add_video_info("duration", video_obj['duration'])
  156. item.add_video_info("platform", platform)
  157. item.add_video_info("strategy", "search")
  158. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  159. mq_obj = item.produce_item()
  160. return mq_obj
  161. @classmethod
  162. def xg_video_producer(cls, video_obj, user, trace_id):
  163. """
  164. 西瓜搜索
  165. :param video_obj:
  166. :param user:
  167. :param trace_id:
  168. :return:
  169. """
  170. platform = "xg_search"
  171. publish_time_stamp = int(video_obj['publish_time'])
  172. item = VideoItem()
  173. item.add_video_info("user_id", user["uid"])
  174. item.add_video_info("user_name", user["nick_name"])
  175. item.add_video_info("video_id", video_obj['video_id'])
  176. item.add_video_info("video_title", trace_id)
  177. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  178. item.add_video_info("video_url", video_obj["video_url"])
  179. item.add_video_info("cover_url", video_obj["cover_url"])
  180. item.add_video_info("out_video_id", video_obj['video_id'])
  181. item.add_video_info("play_cnt", video_obj['play_cnt'])
  182. item.add_video_info("duration", video_obj['duration'])
  183. item.add_video_info("like_cnt", video_obj['like_cnt'])
  184. item.add_video_info("out_user_id", trace_id)
  185. item.add_video_info("platform", platform)
  186. item.add_video_info("strategy", "search")
  187. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  188. mq_obj = item.produce_item()
  189. return mq_obj
  190. def video_mq_sender(video_obj, user, trace_id, platform):
  191. """
  192. 异步处理微信 video_obj
  193. 公众号和站内账号一一对应
  194. :param platform:
  195. :param user:
  196. :param trace_id:
  197. :param video_obj:
  198. :return:
  199. """
  200. ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  201. Video = VideoProducer()
  202. if platform == "xg_search":
  203. mq_obj = Video.xg_video_producer(
  204. video_obj=video_obj,
  205. user=user,
  206. trace_id=trace_id,
  207. )
  208. elif platform == "baidu_search":
  209. mq_obj = Video.baidu_video_producer(
  210. video_obj=video_obj,
  211. user=user,
  212. trace_id=trace_id,
  213. )
  214. elif platform == "wx_search":
  215. mq_obj = Video.wx_video_producer(
  216. video_obj=video_obj,
  217. user=user,
  218. trace_id=trace_id,
  219. )
  220. else:
  221. mq_obj = {}
  222. ETL_MQ.send_msg(params=mq_obj)
  223. logging(
  224. code="6002",
  225. info="发送消息至 ETL",
  226. data=mq_obj,
  227. trace_id=trace_id
  228. )