# -*- coding: utf-8 -*- import json import os import random import sys import datetime import itertools from collections import defaultdict from common.sql_help import sqlCollect sys.path.append(os.getcwd()) from common.feishu_utils import Feishu class Material(): @classmethod def get_title_rule(cls): summary = Feishu.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "BS9uyu") for row in summary[1:]: title_rule = row[0] if title_rule: return title_rule else: return None return None @classmethod def feishu_list(cls,channel_id): summary = Feishu.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "dQriSJ") for row in summary[1:]: channel = row[0] day_count = row[1] if channel: if channel == channel_id: return day_count else: return None return None @classmethod def get_count_restrict(cls, channel): count_channel = Feishu.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "187FZ7") for row in count_channel[1:]: sheet_channel = row[0] if sheet_channel == channel: return row[1] """ 获取汇总表所有负责人列表 """ @classmethod def feishu_list(cls): summary = Feishu.get_values_batch("summary", "bc154d") list = [] for row in summary[1:]: mark = row[0] name = row[1] feishu_id = row[3] feishu_sheet = row[4] cookie_sheet = row[5] number = {"mark": mark, "name": name, "feishu_id": feishu_id, "feishu_sheet": feishu_sheet, "cookie_sheet": cookie_sheet} if mark: list.append(number) else: return list return list @classmethod def get_sph_user(cls): data = Feishu.get_values_batch("GPbhsb5vchAN3qtzot6cu1f0n1c", "cc7ef0") user_data_list = [] try: for row in data[1:]: users = str(row[2]) if users and users != 'None': if ',' in users: user_list = users.split(',') else: user_list = [users] for user in user_list: status = sqlCollect.sph_channel_user(user) if status: user_data_list.append(user) else: return user_data_list return user_data_list except: return user_data_list """ list 重新排序 """ @classmethod def sort_keyword_data(cls, data): # 解析 JSON 数据 data = [json.loads(item) for item in data] # 根据 keyword_name 进行分组 groups = defaultdict(list) for item in data: groups[item['keyword_name']].append(item) # 获取所有唯一的 keyword_name priority_names = list(groups.keys()) # 对每个分组内的数据按 first_category 进行随机打乱 for name in priority_names: random.shuffle(groups[name]) # 打乱每个分组内的数据顺序 # 轮流排序每个分组的数据,保持 keyword_name 的顺序 result = [] max_length = max(len(groups[name]) for name in priority_names) for i in range(max_length): for name in priority_names: if i < len(groups[name]): result.append(groups[name][i]) # 将结果转回 JSON 字符串列表 sorted_list = [json.dumps(item, ensure_ascii=False) for item in result] return sorted_list """ 获取搜索任务 """ @classmethod def get_keyword_data(cls, feishu_id, feishu_sheet): data = Feishu.get_values_batch(feishu_id, feishu_sheet) processed_list = [] try: for row in data[1:]: channel_id = row[1] channel_url = str(row[2]) tags = row[3] piaoquan_id = row[4] number = row[5] limit_number = row[6] video_share = row[7] video_ending = row[8] voice = row[9] crop_tool = row[10] gg_duration = row[11] title = row[12] if channel_url == None or channel_url == "" or len(channel_url) == 0: continue first_category = row[14] # 一级品类 secondary_category = row[15] # 二级品类 def count_items(item, separator): if item and item not in {'None', ''}: return len(item.split(separator)) return 0 video_id_total = count_items(str(channel_url), ',') title_total = count_items(str(title), '/') video_ending_total = count_items(str(video_ending), ',') values = [channel_id, video_id_total, piaoquan_id, video_share, video_ending_total, crop_tool, gg_duration, title_total, first_category] filtered_values = [str(value) for value in values if value is not None and value != "None"] task_mark = "_".join(map(str, filtered_values)) keyword_sort = row[16] # 排序条件 keyword_time = row[17] # 发布时间 keyword_duration = row[18] # 视频时长 keyword_name = row[19] # 负责人 keyword_sort_list = keyword_sort.split(',') keyword_duration_list = keyword_duration.split(',') keyword_time_list = keyword_time.split(',') combinations = list(itertools.product(keyword_sort_list, keyword_time_list, keyword_duration_list)) if ',' in channel_url: channel_url = channel_url.split(',') else: channel_url = [channel_url] for user in channel_url: for combo in combinations: number_dict = { "task_mark": task_mark, "channel_id": channel_id, "channel_url": user, "piaoquan_id": piaoquan_id, "number": number, "title": title, "video_share": video_share, "video_ending": video_ending, "crop_total": crop_tool, "gg_duration_total": gg_duration, "voice": voice, "first_category": first_category, # 一级品类 "secondary_category": secondary_category, # 二级品类 "combo": combo, # 搜索条件 "keyword_name": keyword_name, # 品类负责人 "tags": tags, "limit_number":limit_number } processed_list.append(json.dumps(number_dict, ensure_ascii=False)) except: processed_list = cls.sort_keyword_data(processed_list) return processed_list processed_list = cls.sort_keyword_data(processed_list) return processed_list """ 获取品类对应负责人任务明细 """ @classmethod def get_pl_task_data(cls, feishu_id, feishu_sheet): data = Feishu.get_values_batch( feishu_id, feishu_sheet ) processed_list = [] try: for row in data[1:]: channel_id = row[1] channel_url = str( row[2] ) tags = row[3] piaoquan_id = row[4] number = row[5] limit_number = row[6] video_share = row[7] video_ending = row[8] voice = row[9] crop_tool = row[10] gg_duration = row[11] title = row[12] if channel_url == None or channel_url == "" or len( channel_url ) == 0: continue try: ls_number = int( row[13] ) except: ls_number = None first_category = row[14] name = row[15] def count_items(item, separator): if item and item not in {'None', ''}: return len( item.split( separator ) ) return 0 video_id_total = count_items( str( channel_url ), ',' ) title_total = count_items( str( title ), '/' ) video_ending_total = count_items( str( video_ending ), ',' ) values = [channel_id, video_id_total, piaoquan_id, video_share, video_ending_total, crop_tool, gg_duration, title_total] filtered_values = [str( value ) for value in values if value is not None and value != "None"] task_mark = "_".join( map( str, filtered_values ) ) if piaoquan_id and piaoquan_id not in {'None', ''}: if ',' in channel_url: channel_url = channel_url.split( ',' ) else: channel_url = [channel_url] for user in channel_url: number_dict = { "task_mark": task_mark, "channel_id": channel_id, "channel_url": user, "piaoquan_id": piaoquan_id, "number": number, "title": title, "video_share": video_share, "video_ending": video_ending, "crop_total": crop_tool, "gg_duration_total": gg_duration, "voice": voice, "first_category": first_category, # 一级品类 "keyword_name":name, "tags": tags, "limit_number":limit_number } processed_list.append( json.dumps( number_dict, ensure_ascii=False ) ) if channel_id == "抖音" or channel_id == "快手" or channel_id == "视频号": if ls_number and ls_number not in {'None', ''}: if channel_id == "抖音": new_channel_id = "抖音历史" if channel_id == "快手": new_channel_id = "快手历史" if channel_id == "视频号": new_channel_id = "视频号历史" # values1 = [new_channel_id, video_id_total, piaoquan_id, video_share, video_ending_total, # crop_tool, # gg_duration, title_total] # filtered_values1 = [str( value ) for value in values1 if # value is not None and value != "None"] # task_mark1 = "_".join( map( str, filtered_values1 ) ) number_dict = { "task_mark": task_mark, "channel_id": new_channel_id, "channel_url": user, "piaoquan_id": piaoquan_id, "number": ls_number, "title": title, "video_share": video_share, "video_ending": video_ending, "crop_total": crop_tool, "gg_duration_total": gg_duration, "voice": voice, "first_category": first_category, # 一级品类 "keyword_name": name, "tags": tags, "limit_number":limit_number } processed_list.append( json.dumps( number_dict, ensure_ascii=False ) ) else: processed_list = cls.sort_keyword_data(processed_list) return processed_list processed_list = cls.sort_keyword_data(processed_list) return processed_list except: processed_list = cls.sort_keyword_data(processed_list) return processed_list """ 获取对应负责人任务明细 """ @classmethod def get_task_data(cls, feishu_id, feishu_sheet): data = Feishu.get_values_batch(feishu_id, feishu_sheet) processed_list = [] try: for row in data[1:]: channel_id = row[1] channel_url = str(row[2]) tags = row[3] piaoquan_id = row[4] number = row[5] limit_number = row[6] video_share = row[7] video_ending = row[8] voice = row[9] crop_tool = row[10] gg_duration = row[11] title = row[12] if channel_url == None or channel_url == "" or len(channel_url) == 0: continue try: ls_number = int(row[13]) except: ls_number = None def count_items(item, separator): if item and item not in {'None', ''}: return len(item.split(separator)) return 0 video_id_total = count_items(str(channel_url), ',') title_total = count_items(str(title), '/') video_ending_total = count_items(str(video_ending), ',') values = [channel_id, video_id_total, piaoquan_id, video_share, video_ending_total, crop_tool, gg_duration, title_total] filtered_values = [str(value) for value in values if value is not None and value != "None"] task_mark = "_".join(map(str, filtered_values)) if piaoquan_id and piaoquan_id not in {'None', ''}: if ',' in channel_url: channel_url = channel_url.split(',') else: channel_url = [channel_url] for user in channel_url: number_dict = { "task_mark": task_mark, "channel_id": channel_id, "channel_url": user, "piaoquan_id": piaoquan_id, "number": number, "title": title, "video_share": video_share, "video_ending": video_ending, "crop_total": crop_tool, "gg_duration_total": gg_duration, "voice": voice, "tags":tags, "limit_number":limit_number } processed_list.append(json.dumps(number_dict, ensure_ascii=False)) if channel_id == "抖音" or channel_id == "快手" or channel_id == "视频号": if ls_number and ls_number not in {'None', ''}: if channel_id == "抖音": new_channel_id = "抖音历史" if channel_id == "快手": new_channel_id = "快手历史" if channel_id == "视频号": new_channel_id = "视频号历史" # values1 = [new_channel_id, video_id_total, piaoquan_id, video_share, video_ending_total, crop_tool, # gg_duration, title_total] # filtered_values1 = [str(value) for value in values1 if value is not None and value != "None"] # task_mark1 = "_".join(map(str, filtered_values1)) number_dict = { "task_mark": task_mark, "channel_id": new_channel_id, "channel_url": user, "piaoquan_id": piaoquan_id, "number": ls_number, "title": title, "video_share": video_share, "video_ending": video_ending, "crop_total": crop_tool, "gg_duration_total": gg_duration, "voice": voice, "tags": tags, "limit_number":limit_number } processed_list.append(json.dumps(number_dict, ensure_ascii=False)) else: return processed_list return processed_list except: return processed_list """ 获取对应片尾+srt """ @classmethod def get_pwsrt_data(cls, feishu_id, feishu_sheet, video_ending): data = Feishu.get_values_batch(feishu_id, feishu_sheet) for row in data[1:]: pw_mark = row[0] pw_id = row[1] pw_srt = row[2] if pw_id != 'None' and pw_id != '' and pw_id != None: if pw_mark == video_ending: number = {"pw_id": pw_id, "pw_srt": pw_srt} return number return '' """ 获取对应固定字幕 """ @classmethod def get_pzsrt_data(cls, feishu_id, feishu_sheet, video_share_name): data = Feishu.get_values_batch(feishu_id, feishu_sheet) for row in data[1:]: pz_mark = row[0] pz_zm = row[1] if pz_zm != 'None' and pz_zm != '' and pz_zm != None: if pz_mark == video_share_name: return pz_zm return '' """ 获取 cookie 信息 """ @classmethod def get_cookie_data(cls, feishu_id, cookie_sheet, channel): data = Feishu.get_values_batch(feishu_id, cookie_sheet) for row in data[1:]: channel_mask = row[0] cookie = row[1] if channel_mask == channel: return cookie