# -*- coding: utf-8 -*- import json import os import sys import random import itertools sys.path.append(os.getcwd()) from collections import defaultdict from utils.feishu_utils import Feishu class Material: """ 获取对应固定字幕 """ @classmethod def get_pzsrt_data(cls, feishu_id, feishu_sheet, video_share_name): data = Feishu.get_values_batch(feishu_id, feishu_sheet) for row in data[1:]: pz_mark = row[0] pz_zm = row[1] if pz_zm != 'None' and pz_zm != '' and pz_zm != None: if pz_mark == video_share_name: return pz_zm return "温馨提示:\n点击下方按钮,传递好运" """获取去重天数""" @classmethod def get_count_restrict(cls, channel): count_channel = Feishu.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "dQriSJ") for row in count_channel[1:]: sheet_channel = row[0] if sheet_channel == channel: return row[1] """ list 重新排序 """ @classmethod def sort_keyword_data(cls, data): # 解析 JSON 数据 data = [json.loads(item) for item in data] # 根据 keyword_name 进行分组 groups = defaultdict(list) for item in data: groups[item['keyword_name']].append(item) # 获取所有唯一的 keyword_name priority_names = list(groups.keys()) # 对每个分组内的数据按 first_category 进行随机打乱 for name in priority_names: random.shuffle(groups[name]) # 打乱每个分组内的数据顺序 # 轮流排序每个分组的数据,保持 keyword_name 的顺序 result = [] max_length = max(len(groups[name]) for name in priority_names) for i in range(max_length): for name in priority_names: if i < len(groups[name]): result.append(groups[name][i]) # 将结果转回 JSON 字符串列表 sorted_list = [json.dumps(item, ensure_ascii=False) for item in result] return sorted_list """ 获取品类对应负责人任务明细 """ @classmethod def get_carry_data(cls, fs_id, fs_sheet): data = Feishu.get_values_batch( fs_id, fs_sheet ) processed_list = [] try: for row in data[1:]: channel = row[1] # 渠道 channel_url = row[2] #账号信息 tag = row[3] #标签 pd_id = row[4] #票圈id latest_count = row[5] # 最新视频 条数 totality_count = row[6] # 任务总条数 video_share = row[7] # 片中分享文案 video_share_ending = row[8] # 片尾分享 voice = row[9] # 片尾音色 crop_tool = row[10] # 画面裁剪 gg_duration = row[11] #广告时间剪裁 ai_title = row[12] # AI标题 hottest_count = row[13] # 最热视频 条数 first_category = row[14] # 一级品类 secondary_category = row[15] # 二级品类(搜索词)/ 负责人(品类) keyword_sort = row[16] # 排序条件(搜索词条件筛选) keyword_time = row[17] # 发布时间(搜索词件筛选) keyword_duration = row[18] # 视频时长(搜索词件筛选) keyword_name = row[19] # 负责人(搜索词件筛选) if not channel: continue if ',' in channel_url: channel_url = channel_url.split(',') else: channel_url = [channel_url] if "搜索" not in channel : for user in channel_url: number_dict = { "task_mark" : f"{channel}-{first_category}", "channel": channel, "channel_url": user, "tag": tag, "pd_id": pd_id, "count": latest_count, "totality_count": totality_count, "video_share": video_share, "video_share_ending": video_share_ending, "voice": voice, "crop_tool": crop_tool, "gg_duration": gg_duration, "ai_title": ai_title, "first_category": first_category, "keyword_name": secondary_category } processed_list.append(json.dumps(number_dict, ensure_ascii=False)) if hottest_count: for user in channel_url: number_dict = { "task_mark" :f"历史{channel}-{first_category}", "channel": f"历史{channel}", "channel_url": user, "tag": tag, "pd_id": pd_id, "count": hottest_count, "totality_count": totality_count, "video_share": video_share, "video_share_ending": video_share_ending, "voice": voice, "crop_tool": crop_tool, "gg_duration": gg_duration, "ai_title": ai_title, "first_category": first_category, "keyword_name": secondary_category } processed_list.append(json.dumps(number_dict, ensure_ascii=False)) else: keyword_sort_list = keyword_sort.split(',') keyword_duration_list = keyword_duration.split(',') keyword_time_list = keyword_time.split(',') combinations = list(itertools.product(keyword_sort_list, keyword_time_list, keyword_duration_list)) for user in channel_url: for combo in combinations: number_dict = { "task_mark" : f"{channel}-{first_category}", "channel": channel, "channel_url": user, "tag": tag, "pd_id": pd_id, "count": latest_count, "totality_count": totality_count, "video_share": video_share, "video_share_ending": video_share_ending, "voice": voice, "crop_tool": crop_tool, "gg_duration": gg_duration, "ai_title": ai_title, "first_category": first_category, "secondary_category": secondary_category, "combo": combo , "keyword_name": keyword_name, } processed_list.append(json.dumps(number_dict, ensure_ascii=False)) processed_list = cls.sort_keyword_data(processed_list) return processed_list except: processed_list = cls.sort_keyword_data(processed_list) return processed_list