""" @author: luojunhui """ import json import time from applications.functions.common import clean_title class VideoItem(object): """ function: 当扫描进一条视频的时候,对该视频的基本信息进行处理,保证发送给 pipeline和 etl 的 video_dict 是正确的 __init__: 初始化空json 对象,用来存储视频信息 add_video_info: 把视频信息存储到 item 对象中 check_item: 检查 item 对象中的各个元素以及处理 """ def __init__(self): self.item = {} def add_video_info(self, key, value): """ insert or update video info :param key: :param value: """ self.item[key] = value def check_item(self): """ 判断item 里面的字段,是否符合要求 字段分为 3 类: 1. 必须存在数据的字段: ["video_id", "user_id", "user_name", "out_user_id", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy"] 2. 不存在默认为 0 的字段 :["duration", "play_cnt", "like_cnt", "comment_cnt", "share_cnt", "width", "height"] 3. 需要后出理的字段: video_title, publish_time """ if self.item.get("video_title"): self.item["video_title"] = clean_title(self.item["video_title"]) else: return False if self.item.get("publish_time_stamp"): publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(self.item["publish_time_stamp"]) ) self.add_video_info("publish_time_str", publish_time_str) else: publish_time_stamp = int(time.time()) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) self.add_video_info("publish_time_stamp", publish_time_stamp) self.add_video_info("publish_time_str", publish_time_str) self.add_video_info("publish_time", publish_time_str) if not self.item.get("update_time_stamp"): self.add_video_info("update_time_stamp", int(time.time())) # 如果不存在,默认值为 0 config_keys = [ "duration", "play_cnt", "like_cnt", "comment_cnt", "share_cnt", "width", "height", ] for config_key in config_keys: if self.item.get(config_key): continue else: self.add_video_info(config_key, 0) # 必须存在的元素,若不存在则会报错 must_keys = [ "video_id", "user_id", # "user_name", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy", ] """ video_id, out_video_id 均为站外视频 id usr_id: 站内用户 id out_user_id: 站外用户 id user_name: 站外用户名称 """ for m_key in must_keys: if self.item.get(m_key): continue else: # print(m_key) return False return True def produce_item(self): """ item producer :return: """ flag = self.check_item() if flag: return self.item else: return False class VideoProducer(object): """ 处理视频 """ @classmethod def wx_video_producer(cls, video_obj, user, trace_id): """ 异步处理微信 video_obj 公众号和站内账号一一对应 :param trace_id: :param user: :param video_obj: :return: """ platform = "weixin_search" publish_time_stamp = int(video_obj['pubTime']) item = VideoItem() item.add_video_info("user_id", user) # item.add_video_info("user_name", user["nick_name"]) item.add_video_info("video_id", video_obj['hashDocID']) item.add_video_info("video_title", trace_id) item.add_video_info("publish_time_stamp", int(publish_time_stamp)) item.add_video_info("video_url", video_obj["videoUrl"]) item.add_video_info("cover_url", video_obj["image"]) item.add_video_info("out_video_id", video_obj['hashDocID']) item.add_video_info("out_user_id", trace_id) item.add_video_info("platform", platform) item.add_video_info("strategy", "search") item.add_video_info("session", "{}-{}".format(platform, int(time.time()))) mq_obj = item.produce_item() return mq_obj @classmethod def baidu_video_producer(cls, video_obj, user, trace_id): """ 处理好看视频的 video_info :param video_obj: :param user: :param trace_id: :return: """ platform = "baidu_search" publish_time_stamp = int(video_obj['publish_time']) item = VideoItem() # print("baidu") # print(json.dumps(video_obj, ensure_ascii=False, indent=4)) item.add_video_info("user_id", user) # item.add_video_info("user_name", user["nick_name"]) item.add_video_info("video_id", video_obj['id']) item.add_video_info("video_title", video_obj['title']) item.add_video_info("publish_time_stamp", publish_time_stamp) item.add_video_info("video_url", video_obj["playurl"]) item.add_video_info("cover_url", video_obj["poster"]) item.add_video_info("out_video_id", video_obj['id']) item.add_video_info("out_user_id", trace_id) item.add_video_info("like_cnt", video_obj['like'] if video_obj.get('like') else 0) item.add_video_info("play_cnt", video_obj['playcnt']) item.add_video_info("duration", video_obj['duration']) item.add_video_info("platform", platform) item.add_video_info("strategy", "search") item.add_video_info("session", "{}-{}".format(platform, int(time.time()))) mq_obj = item.produce_item() return mq_obj @classmethod def xg_video_producer(cls, video_obj, user, trace_id): """ 西瓜搜索 :param video_obj: :param user: :param trace_id: :return: """ platform = "xg_search" publish_time_stamp = int(video_obj['publish_time']) item = VideoItem() item.add_video_info("user_id", user) # item.add_video_info("user_name", user["nick_name"]) item.add_video_info("video_id", video_obj['video_id']) item.add_video_info("video_title", video_obj.get('video_title')) item.add_video_info("publish_time_stamp", int(publish_time_stamp)) item.add_video_info("video_url", video_obj["video_url"]) item.add_video_info("cover_url", video_obj["cover_url"]) item.add_video_info("out_video_id", video_obj['video_id']) item.add_video_info("play_cnt", video_obj['play_cnt']) item.add_video_info("duration", video_obj['duration']) item.add_video_info("like_cnt", video_obj['like_cnt']) item.add_video_info("out_user_id", trace_id) item.add_video_info("platform", platform) item.add_video_info("strategy", "search") item.add_video_info("session", "{}-{}".format(platform, int(time.time()))) mq_obj = item.produce_item() return mq_obj @classmethod def dy_video_producer(cls, video_obj, user, trace_id): """ :param video_obj: :param user: :param trace_id: :return: """ platform = "dy_search" publish_time_stamp = int(video_obj['publish_timestamp'] / 1000) item = VideoItem() # print("douyin") # print(json.dumps(video_obj, ensure_ascii=False, indent=4)) item.add_video_info("user_id", user) # item.add_video_info("user_name", user["nick_name"]) item.add_video_info("video_id", video_obj['channel_content_id']) item.add_video_info("video_title", video_obj['title']) item.add_video_info("publish_time_stamp", int(publish_time_stamp)) item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url']) item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url']) item.add_video_info("out_video_id", video_obj['channel_content_id']) item.add_video_info("play_cnt", video_obj['play_count']) item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration']) item.add_video_info("like_cnt", video_obj['like_count']) item.add_video_info("out_user_id", trace_id) item.add_video_info("platform", platform) item.add_video_info("strategy", "search") item.add_video_info("session", "{}-{}".format(platform, int(time.time()))) mq_obj = item.produce_item() return mq_obj