video_item.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from applications.functions.common import clean_title
  7. class VideoItem(object):
  8. """
  9. function: 当扫描进一条视频的时候,对该视频的基本信息进行处理,保证发送给 pipeline和 etl 的 video_dict 是正确的
  10. __init__: 初始化空json 对象,用来存储视频信息
  11. add_video_info: 把视频信息存储到 item 对象中
  12. check_item: 检查 item 对象中的各个元素以及处理
  13. """
  14. def __init__(self):
  15. self.item = {}
  16. def add_video_info(self, key, value):
  17. """
  18. insert or update video info
  19. :param key:
  20. :param value:
  21. """
  22. self.item[key] = value
  23. def check_item(self):
  24. """
  25. 判断item 里面的字段,是否符合要求
  26. 字段分为 3 类:
  27. 1. 必须存在数据的字段: ["video_id", "user_id", "user_name", "out_user_id", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy"]
  28. 2. 不存在默认为 0 的字段 :["duration", "play_cnt", "like_cnt", "comment_cnt", "share_cnt", "width", "height"]
  29. 3. 需要后出理的字段: video_title, publish_time
  30. """
  31. if self.item.get("video_title"):
  32. self.item["video_title"] = clean_title(self.item["video_title"])
  33. else:
  34. return False
  35. if self.item.get("publish_time_stamp"):
  36. publish_time_str = time.strftime(
  37. "%Y-%m-%d %H:%M:%S", time.localtime(self.item["publish_time_stamp"])
  38. )
  39. self.add_video_info("publish_time_str", publish_time_str)
  40. else:
  41. publish_time_stamp = int(time.time())
  42. publish_time_str = time.strftime(
  43. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  44. )
  45. self.add_video_info("publish_time_stamp", publish_time_stamp)
  46. self.add_video_info("publish_time_str", publish_time_str)
  47. self.add_video_info("publish_time", publish_time_str)
  48. if not self.item.get("update_time_stamp"):
  49. self.add_video_info("update_time_stamp", int(time.time()))
  50. # 如果不存在,默认值为 0
  51. config_keys = [
  52. "duration",
  53. "play_cnt",
  54. "like_cnt",
  55. "comment_cnt",
  56. "share_cnt",
  57. "width",
  58. "height",
  59. ]
  60. for config_key in config_keys:
  61. if self.item.get(config_key):
  62. continue
  63. else:
  64. self.add_video_info(config_key, 0)
  65. # 必须存在的元素,若不存在则会报错
  66. must_keys = [
  67. "video_id",
  68. "user_id",
  69. # "user_name",
  70. "out_video_id",
  71. "session",
  72. "video_url",
  73. "cover_url",
  74. "platform",
  75. "strategy",
  76. ]
  77. """
  78. video_id, out_video_id 均为站外视频 id
  79. usr_id: 站内用户 id
  80. out_user_id: 站外用户 id
  81. user_name: 站外用户名称
  82. """
  83. for m_key in must_keys:
  84. if self.item.get(m_key):
  85. continue
  86. else:
  87. # print(m_key)
  88. return False
  89. return True
  90. def produce_item(self):
  91. """
  92. item producer
  93. :return:
  94. """
  95. flag = self.check_item()
  96. if flag:
  97. return self.item
  98. else:
  99. return False
  100. class VideoProducer(object):
  101. """
  102. 处理视频
  103. """
  104. @classmethod
  105. def wx_video_produce(cls, video_obj, user, trace_id):
  106. """
  107. 异步处理微信 video_obj
  108. 公众号和站内账号一一对应
  109. :param trace_id:
  110. :param user:
  111. :param video_obj:
  112. :return:
  113. """
  114. platform = "weixin_search"
  115. publish_timestamp = int(video_obj['pubTime'])
  116. item = VideoItem()
  117. item.add_video_info("user_id", user)
  118. # item.add_video_info("user_name", user["nick_name"])
  119. item.add_video_info("video_id", video_obj['hashDocID'])
  120. item.add_video_info("video_title", trace_id)
  121. item.add_video_info("publish_time_stamp", int(publish_timestamp))
  122. item.add_video_info("video_url", video_obj["videoUrl"])
  123. item.add_video_info("cover_url", video_obj["image"])
  124. item.add_video_info("out_video_id", video_obj['hashDocID'])
  125. item.add_video_info("out_user_id", trace_id)
  126. item.add_video_info("platform", platform)
  127. item.add_video_info("strategy", "search")
  128. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  129. mq_obj = item.produce_item()
  130. return mq_obj
  131. @classmethod
  132. def baidu_video_produce(cls, video_obj, user, trace_id):
  133. """
  134. 处理好看视频的 video_info
  135. :param video_obj:
  136. :param user:
  137. :param trace_id:
  138. :return:
  139. """
  140. platform = "baidu_search"
  141. publish_timestamp = int(video_obj['publish_time'])
  142. item = VideoItem()
  143. item.add_video_info("user_id", user)
  144. # item.add_video_info("user_name", user["nick_name"])
  145. item.add_video_info("video_id", video_obj['id'])
  146. item.add_video_info("video_title", video_obj['title'])
  147. item.add_video_info("publish_time_stamp", publish_timestamp)
  148. item.add_video_info("video_url", video_obj["playurl"])
  149. item.add_video_info("cover_url", video_obj["poster"])
  150. item.add_video_info("out_video_id", video_obj['id'])
  151. item.add_video_info("out_user_id", trace_id)
  152. item.add_video_info("like_cnt", video_obj['like'] if video_obj.get('like') else 0)
  153. item.add_video_info("play_cnt", video_obj['playcnt'])
  154. item.add_video_info("duration", video_obj['duration'])
  155. item.add_video_info("platform", platform)
  156. item.add_video_info("strategy", "search")
  157. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  158. mq_obj = item.produce_item()
  159. return mq_obj
  160. @classmethod
  161. def xg_video_produce(cls, video_obj, user, trace_id):
  162. """
  163. 西瓜搜索
  164. :param video_obj:
  165. :param user:
  166. :param trace_id:
  167. :return:
  168. """
  169. platform = "xg_search"
  170. publish_timestamp = int(video_obj['publish_time'])
  171. item = VideoItem()
  172. item.add_video_info("user_id", user)
  173. # item.add_video_info("user_name", user["nick_name"])
  174. item.add_video_info("video_id", video_obj['video_id'])
  175. item.add_video_info("video_title", video_obj.get('video_title'))
  176. item.add_video_info("publish_time_stamp", int(publish_timestamp))
  177. item.add_video_info("video_url", video_obj["video_url"])
  178. item.add_video_info("cover_url", video_obj["cover_url"])
  179. item.add_video_info("out_video_id", video_obj['video_id'])
  180. item.add_video_info("play_cnt", video_obj['play_cnt'])
  181. item.add_video_info("duration", video_obj['duration'])
  182. item.add_video_info("like_cnt", video_obj['like_cnt'])
  183. item.add_video_info("out_user_id", trace_id)
  184. item.add_video_info("platform", platform)
  185. item.add_video_info("strategy", "search")
  186. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  187. mq_obj = item.produce_item()
  188. return mq_obj
  189. @classmethod
  190. def dy_video_produce(cls, video_obj, user, trace_id):
  191. """
  192. :param video_obj:
  193. :param user:
  194. :param trace_id:
  195. :return:
  196. """
  197. platform = "dy_search"
  198. publish_timestamp = int(video_obj['publish_timestamp'] / 1000)
  199. item = VideoItem()
  200. # print("douyin")
  201. # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
  202. item.add_video_info("user_id", user)
  203. # item.add_video_info("user_name", user["nick_name"])
  204. item.add_video_info("video_id", video_obj['channel_content_id'])
  205. item.add_video_info("video_title", video_obj['title'])
  206. item.add_video_info("publish_time_stamp", int(publish_timestamp))
  207. item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
  208. item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
  209. item.add_video_info("out_video_id", video_obj['channel_content_id'])
  210. item.add_video_info("play_cnt", video_obj['play_count'])
  211. item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
  212. item.add_video_info("like_cnt", video_obj['like_count'])
  213. item.add_video_info("out_user_id", trace_id)
  214. item.add_video_info("platform", platform)
  215. item.add_video_info("strategy", "search")
  216. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  217. mq_obj = item.produce_item()
  218. return mq_obj