""" @author: luojunhui """ import time default_single_video_table_fields = { "platform": "gzh", "article_title": None, "content_trace_id": None, "read_cnt": 0, "article_index": None, "out_account_name": None, "article_url": None, "url_unique_md5": None, "category": None, "publish_timestamp": None, "out_account_id": None, "cover_url": None, "crawler_timestamp": int(time.time()), "source_account": 1, "article_publish_type": None, "like_cnt": 0, "bad_status": 0, "tags": None, "video_oss_path": None, } default_account_table_fields = { "platform": 'Not NULL', "account_id": 'Not NULL', "account_name": 'Not NULL', "max_cursor": None, "account_init_date": None, "status": 0, "priority": 0, } default_association_table_fields = { "account_name": 'Not NULL', "account_id": 'Not NULL', "recommend_video_id": 'Not NULL', "title": 'Not NULL', "read_cnt": 0, "duration": 0, "seed_account": 'Not NULL', "seed_title": 'Not NULL', "recommend_date": 'Not NULL', "platform": 'Not NULL' } class Item(object): """ format save to article meta table or single video source table """ def __init__(self): self.item = {} def add(self, key, value): """ add key value to item """ self.item[key] = value def check_video_item(self): """ check video item """ fields = list(default_single_video_table_fields.keys()) for field in fields: if self.item.get(field, None) is not None: continue else: self.item[field] = default_single_video_table_fields[field] def check_article_item(self): """ check article item """ return def check_account_item(self): """ check account item """ fields = list(default_account_table_fields.keys()) for key in fields: if self.item.get(key, None) is not None: continue elif default_account_table_fields[key] == 'Not NULL': raise ValueError(f"{key} is not None, please check your account item") else: self.item[key] = default_account_table_fields[key] def check_association_item(self): """ check association item """ fields = list(default_association_table_fields.keys()) for field in fields: if self.item.get(field, None) is not None: continue elif default_association_table_fields[field] == 'Not NULL': raise ValueError(f"{field} is not None, please check your account item") else: self.item[field] = default_association_table_fields[field] def check(self, source): """ check item """ match source: case "video": self.check_video_item() case "article": self.check_article_item() case "account": self.check_account_item() case "association": self.check_association_item()