123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- """
- @author: luojunhui
- """
- import json
- import time
- from applications.functions.common import clean_title
- class VideoItem(object):
- """
- function: 当扫描进一条视频的时候,对该视频的基本信息进行处理,保证发送给 pipeline和 etl 的 video_dict 是正确的
- __init__: 初始化空json 对象,用来存储视频信息
- add_video_info: 把视频信息存储到 item 对象中
- check_item: 检查 item 对象中的各个元素以及处理
- """
- def __init__(self):
- self.item = {}
- def add_video_info(self, key, value):
- """
- insert or update video info
- :param key:
- :param value:
- """
- self.item[key] = value
- def check_item(self):
- """
- 判断item 里面的字段,是否符合要求
- 字段分为 3 类:
- 1. 必须存在数据的字段: ["video_id", "user_id", "user_name", "out_user_id", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy"]
- 2. 不存在默认为 0 的字段 :["duration", "play_cnt", "like_cnt", "comment_cnt", "share_cnt", "width", "height"]
- 3. 需要后出理的字段: video_title, publish_time
- """
- if self.item.get("video_title"):
- self.item["video_title"] = clean_title(self.item["video_title"])
- else:
- return False
- if self.item.get("publish_time_stamp"):
- publish_time_str = time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(self.item["publish_time_stamp"])
- )
- self.add_video_info("publish_time_str", publish_time_str)
- else:
- publish_time_stamp = int(time.time())
- publish_time_str = time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
- )
- self.add_video_info("publish_time_stamp", publish_time_stamp)
- self.add_video_info("publish_time_str", publish_time_str)
- self.add_video_info("publish_time", publish_time_str)
- if not self.item.get("update_time_stamp"):
- self.add_video_info("update_time_stamp", int(time.time()))
- # 如果不存在,默认值为 0
- config_keys = [
- "duration",
- "play_cnt",
- "like_cnt",
- "comment_cnt",
- "share_cnt",
- "width",
- "height",
- ]
- for config_key in config_keys:
- if self.item.get(config_key):
- continue
- else:
- self.add_video_info(config_key, 0)
- # 必须存在的元素,若不存在则会报错
- must_keys = [
- "video_id",
- "user_id",
- # "user_name",
- "out_video_id",
- "session",
- "video_url",
- "cover_url",
- "platform",
- "strategy",
- ]
- """
- video_id, out_video_id 均为站外视频 id
- usr_id: 站内用户 id
- out_user_id: 站外用户 id
- user_name: 站外用户名称
- """
- for m_key in must_keys:
- if self.item.get(m_key):
- continue
- else:
- # print(m_key)
- return False
- return True
- def produce_item(self):
- """
- item producer
- :return:
- """
- flag = self.check_item()
- if flag:
- return self.item
- else:
- return False
- class VideoProducer(object):
- """
- 处理视频
- """
- @classmethod
- def wx_video_produce(cls, video_obj, user, trace_id):
- """
- 异步处理微信 video_obj
- 公众号和站内账号一一对应
- :param trace_id:
- :param user:
- :param video_obj:
- :return:
- """
- platform = "weixin_search"
- publish_timestamp = int(video_obj['pubTime'])
- item = VideoItem()
- item.add_video_info("user_id", user)
- # item.add_video_info("user_name", user["nick_name"])
- item.add_video_info("video_id", video_obj['hashDocID'])
- item.add_video_info("video_title", trace_id)
- item.add_video_info("publish_time_stamp", int(publish_timestamp))
- item.add_video_info("video_url", video_obj["videoUrl"])
- item.add_video_info("cover_url", video_obj["image"])
- item.add_video_info("out_video_id", video_obj['hashDocID'])
- item.add_video_info("out_user_id", trace_id)
- item.add_video_info("platform", platform)
- item.add_video_info("strategy", "search")
- item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
- mq_obj = item.produce_item()
- return mq_obj
- @classmethod
- def baidu_video_produce(cls, video_obj, user, trace_id):
- """
- 处理好看视频的 video_info
- :param video_obj:
- :param user:
- :param trace_id:
- :return:
- """
- platform = "baidu_search"
- publish_timestamp = int(video_obj['publish_time'])
- item = VideoItem()
- item.add_video_info("user_id", user)
- # item.add_video_info("user_name", user["nick_name"])
- item.add_video_info("video_id", video_obj['id'])
- item.add_video_info("video_title", video_obj['title'])
- item.add_video_info("publish_time_stamp", publish_timestamp)
- item.add_video_info("video_url", video_obj["playurl"])
- item.add_video_info("cover_url", video_obj["poster"])
- item.add_video_info("out_video_id", video_obj['id'])
- item.add_video_info("out_user_id", trace_id)
- item.add_video_info("like_cnt", video_obj['like'] if video_obj.get('like') else 0)
- item.add_video_info("play_cnt", video_obj['playcnt'])
- item.add_video_info("duration", video_obj['duration'])
- item.add_video_info("platform", platform)
- item.add_video_info("strategy", "search")
- item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
- mq_obj = item.produce_item()
- return mq_obj
- @classmethod
- def xg_video_produce(cls, video_obj, user, trace_id):
- """
- 西瓜搜索
- :param video_obj:
- :param user:
- :param trace_id:
- :return:
- """
- platform = "xg_search"
- publish_timestamp = int(video_obj['publish_time'])
- item = VideoItem()
- item.add_video_info("user_id", user)
- # item.add_video_info("user_name", user["nick_name"])
- item.add_video_info("video_id", video_obj['video_id'])
- item.add_video_info("video_title", video_obj.get('video_title'))
- item.add_video_info("publish_time_stamp", int(publish_timestamp))
- item.add_video_info("video_url", video_obj["video_url"])
- item.add_video_info("cover_url", video_obj["cover_url"])
- item.add_video_info("out_video_id", video_obj['video_id'])
- item.add_video_info("play_cnt", video_obj['play_cnt'])
- item.add_video_info("duration", video_obj['duration'])
- item.add_video_info("like_cnt", video_obj['like_cnt'])
- item.add_video_info("out_user_id", trace_id)
- item.add_video_info("platform", platform)
- item.add_video_info("strategy", "search")
- item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
- mq_obj = item.produce_item()
- return mq_obj
- @classmethod
- def dy_video_produce(cls, video_obj, user, trace_id):
- """
- :param video_obj:
- :param user:
- :param trace_id:
- :return:
- """
- platform = "dy_search"
- publish_timestamp = int(video_obj['publish_timestamp'] / 1000)
- item = VideoItem()
- # print("douyin")
- # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
- item.add_video_info("user_id", user)
- # item.add_video_info("user_name", user["nick_name"])
- item.add_video_info("video_id", video_obj['channel_content_id'])
- item.add_video_info("video_title", video_obj['title'])
- item.add_video_info("publish_time_stamp", int(publish_timestamp))
- item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
- item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
- item.add_video_info("out_video_id", video_obj['channel_content_id'])
- item.add_video_info("play_cnt", video_obj['play_count'])
- item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
- item.add_video_info("like_cnt", video_obj['like_count'])
- item.add_video_info("out_user_id", trace_id)
- item.add_video_info("platform", platform)
- item.add_video_info("strategy", "search")
- item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
- mq_obj = item.produce_item()
- return mq_obj
|