import asyncio from typing import List, Dict from datetime import datetime, timedelta from core.utils.feishu_data_async import FeishuDataAsync from core.utils.gpt4o_mini_help import GPT4oMini async def get_title_filter_word() -> List[str]: """ 获取飞书表配置的标题过滤词 https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=BS9uyu :return: """ spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg" sheet_id = "BS9uyu" async with FeishuDataAsync() as feishu: feishu_data = await feishu.get_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id) for row in feishu_data[1:]: title_rule = row[0] if title_rule: return title_rule.split(",") else: return None return None async def generate_titles(sheet_id: str,video_obj: Dict,logger,aliyun_log): title_filter_word = await get_title_filter_word() title = video_obj.get("video_title") if not title: return video_obj # 确保title和title_filter_word中的所有元素都是字符串 if title is None: title = "" else: title = str(title) # 过滤掉title_filter_word中的非字符串元素,并确保它们不为None title_filter_word = [str(keyword) for keyword in title_filter_word if keyword is not None] contains_keyword = any(keyword in title for keyword in title_filter_word) logger.info(f"【{title}】标题包含过滤关键词:{contains_keyword}") if contains_keyword: new_title = await GPT4oMini.get_ai_mini_title(title) logger.info(f"生成新的标题:{new_title}") video_obj["video_title"] = new_title current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [ video_obj["video_url"], video_obj["cover_url"], title, new_title, formatted_time, ] await insert_safe_data(sheet_id, values) return video_obj async def insert_safe_data(sheet_id: str, values: List): spreadsheet_token = "U5dXsSlPOhiNNCtEfgqcm1iYnpf" async with FeishuDataAsync() as feishu: await feishu.insert_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id,ranges="A2:Z2",values=values) async def is_near_next_day(threshold_minutes: int = 3) -> bool: """ 时间检查 """ now = datetime.now() tomorrow = now.date() + timedelta(days=1) midnight = datetime.combine(tomorrow, datetime.min.time()) time_left = midnight - now if time_left.total_seconds() < threshold_minutes * 60: return True return False # if __name__ == '__main__': # # asyncio.run(insert_safe_data("K0gA9Y", ["1","2","3","4","5"])) # print(asyncio.run(get_title_filter_word()))