#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简单的 Gemini 处理器 用于满足导入需求,实际功能可以根据需要扩展 """ import os import json from typing import Any, Dict, Optional from dotenv import load_dotenv import google.generativeai as genai from openai import OpenAI class DeepSeekProcessor: def __init__(self): # 加载环境变量 load_dotenv() # 获取API密钥 api_key = os.getenv('DEEPSEEK_API_KEY') base_url = os.getenv('DEEPSEEK_BASE_URL') self.client = OpenAI(api_key=api_key, base_url=base_url) def process(self, content: Any, system_prompt: str) -> Dict[str, Any]: try: # 处理输入内容格式 if isinstance(content, dict): # 将字典转换为JSON字符串 formatted_content = json.dumps(content, ensure_ascii=False) else: formatted_content = content # https://bailian.console.aliyun.com/?tab=api#/api/?type=model&url=2868565 response = self.client.chat.completions.create( model="deepseek-v3", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": formatted_content}, ], stream=False ) return response.choices[0].message.content except Exception as e: print(f"DeepSeek API 调用失败: {e}") return {"error": str(e), "content": content} def batch_process(self, contents: list, system_prompt: str) -> list: results = [] for content in contents: result = self.process(content, system_prompt) results.append(result) return results def main(): # 创建DeepSeekProcessor实例 processor = DeepSeekProcessor() # 示例系统提示 system_prompt = "你是一个有用的AI助手,请简洁地回答问题。" # 示例用户输入 user_input = "什么是人工智能?" # 处理单个请求 print("\n处理单个请求:") result = processor.process(user_input, system_prompt) print(f"输入: {user_input}") print(f"输出: {result}") # 处理批量请求 print("\n处理批量请求:") batch_inputs = ["什么是机器学习?", "什么是深度学习?"] batch_results = processor.batch_process(batch_inputs, system_prompt) for i, (input_text, result) in enumerate(zip(batch_inputs, batch_results)): print(f"\n请求 {i+1}:") print(f"输入: {input_text}") print(f"输出: {result}") if __name__ == "__main__": main()