#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 # import os import threading from typing import List, Dict, Optional from enum import Enum, auto import httpx from pqai_agent import configs from pqai_agent.logging import logger import cozepy from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageType, JWTOAuthApp, JWTAuth import time from openai import OpenAI, AsyncOpenAI, http_client COZE_API_TOKEN = os.getenv("COZE_API_TOKEN") COZE_CN_BASE_URL = 'https://api.coze.cn' VOLCENGINE_API_TOKEN = '5e275c38-44fd-415f-abcf-4b59f6377f72' VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" VOLCENGINE_MODEL_DEEPSEEK_V3 = "deepseek-v3-250324" VOLCENGINE_MODEL_DOUBAO_PRO_1_5 = 'ep-20250307150409-4blz9' VOLCENGINE_MODEL_DOUBAO_PRO_32K = 'ep-20250414202859-6nkz5' VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO = 'ep-20250421193334-nz5wd' DEEPSEEK_API_TOKEN = 'sk-67daad8f424f4854bda7f1fed7ef220b' DEEPSEEK_BASE_URL = 'https://api.deepseek.com/' DEEPSEEK_CHAT_MODEL = 'deepseek-chat' VOLCENGINE_BOT_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3/bots" VOLCENGINE_BOT_DEEPSEEK_V3_SEARCH = "bot-20250427173459-9h2xp" OPENAI_API_TOKEN = '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' OPENAI_BASE_URL = 'https://api.openai.com/v1' OPENAI_MODEL_GPT_4o = 'gpt-4o' OPENAI_MODEL_GPT_4o_mini = 'gpt-4o-mini' OPENROUTER_API_TOKEN = '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' OPENROUTER_BASE_URL = 'https://openrouter.ai/api/v1/' OPENROUTER_MODEL_CLAUDE_3_7_SONNET = 'anthropic/claude-3.7-sonnet' ALIYUN_API_TOKEN = 'sk-47381479425f4485af7673d3d2fd92b6' ALIYUN_BASE_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' class ChatServiceType(Enum): OPENAI_COMPATIBLE = auto() COZE_CHAT = auto() class ModelPrice: EXCHANGE_RATE_TO_CNY = { "USD": 7.2, # Example conversion rate, adjust as needed } def __init__(self, input_price: float, output_price: float, currency: str = 'CNY'): """ :param input_price: input price for per million tokens :param output_price: output price for per million tokens """ self.input_price = input_price self.output_price = output_price self.currency = currency def get_total_cost(self, input_tokens: int, output_tokens: int, convert_to_cny: bool = True) -> float: """ Calculate the total cost based on input and output tokens. :param input_tokens: Number of input tokens :param output_tokens: Number of output tokens :param convert_to_cny: Whether to convert the cost to CNY (default is True) :return: Total cost in the specified currency """ total_cost = (self.input_price * input_tokens / 1_000_000) + (self.output_price * output_tokens / 1_000_000) if convert_to_cny and self.currency != 'CNY': conversion_rate = self.EXCHANGE_RATE_TO_CNY.get(self.currency, 1.0) total_cost *= conversion_rate return total_cost def __repr__(self): return f"ModelPrice(input_price={self.input_price}, output_price={self.output_price}, currency={self.currency})" class OpenAICompatible: volcengine_models = [ VOLCENGINE_MODEL_DOUBAO_PRO_32K, VOLCENGINE_MODEL_DOUBAO_PRO_1_5, VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO, VOLCENGINE_MODEL_DEEPSEEK_V3 ] deepseek_models = [ DEEPSEEK_CHAT_MODEL, ] openai_models = [ OPENAI_MODEL_GPT_4o_mini, OPENAI_MODEL_GPT_4o ] openrouter_models = [ OPENROUTER_MODEL_CLAUDE_3_7_SONNET, ] model_prices = { VOLCENGINE_MODEL_DEEPSEEK_V3: ModelPrice(input_price=2, output_price=8), VOLCENGINE_MODEL_DOUBAO_PRO_32K: ModelPrice(input_price=0.8, output_price=2), VOLCENGINE_MODEL_DOUBAO_PRO_1_5: ModelPrice(input_price=0.8, output_price=2), VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO: ModelPrice(input_price=3, output_price=9), DEEPSEEK_CHAT_MODEL: ModelPrice(input_price=2, output_price=8), OPENAI_MODEL_GPT_4o: ModelPrice(input_price=2.5, output_price=10, currency='USD'), OPENAI_MODEL_GPT_4o_mini: ModelPrice(input_price=0.15, output_price=0.6, currency='USD'), OPENROUTER_MODEL_CLAUDE_3_7_SONNET: ModelPrice(input_price=3, output_price=15, currency='USD'), } @staticmethod def create_client(model_name, **kwargs) -> OpenAI: if model_name in OpenAICompatible.volcengine_models: llm_client = OpenAI(api_key=VOLCENGINE_API_TOKEN, base_url=VOLCENGINE_BASE_URL, **kwargs) elif model_name in OpenAICompatible.deepseek_models: llm_client = OpenAI(api_key=DEEPSEEK_API_TOKEN, base_url=DEEPSEEK_BASE_URL, **kwargs) elif model_name in OpenAICompatible.openai_models: socks_conf = configs.get().get('system', {}).get('outside_proxy', {}).get('socks5', {}) if socks_conf: http_client = httpx.Client( timeout=httpx.Timeout(600, connect=5.0), proxy=f"socks5://{socks_conf['hostname']}:{socks_conf['port']}" ) kwargs['http_client'] = http_client llm_client = OpenAI(api_key=OPENAI_API_TOKEN, base_url=OPENAI_BASE_URL, **kwargs) elif model_name in OpenAICompatible.openrouter_models: llm_client = OpenAI(api_key=OPENROUTER_API_TOKEN, base_url=OPENROUTER_BASE_URL, **kwargs) else: raise Exception("Unsupported model: %s" % model_name) return llm_client @staticmethod def get_price(model_name: str) -> ModelPrice: """ Get the price for a given model. :param model_name: Name of the model :return: ModelPrice object containing input and output prices """ if model_name not in OpenAICompatible.model_prices: raise ValueError(f"Model {model_name} not found in price list.") return OpenAICompatible.model_prices[model_name] @staticmethod def calculate_cost(model_name: str, input_tokens: int, output_tokens: int, convert_to_cny: bool = True) -> float: """ Calculate the cost for a given model based on input and output tokens. :param model_name: Name of the model :param input_tokens: Number of input tokens :param output_tokens: Number of output tokens :param convert_to_cny: Whether to convert the cost to CNY (default is True) :return: Total cost in the model's currency """ if model_name not in OpenAICompatible.model_prices: raise ValueError(f"Model {model_name} not found in price list.") price = OpenAICompatible.model_prices[model_name] return price.get_total_cost(input_tokens, output_tokens, convert_to_cny) class CrossAccountJWTOAuthApp(JWTOAuthApp): def __init__(self, account_id: str, client_id: str, private_key: str, public_key_id: str, base_url): self.account_id = account_id super().__init__(client_id, private_key, public_key_id, base_url) def get_access_token( self, ttl: int = 900, scope: Optional[cozepy.Scope] = None, session_name: Optional[str] = None ) -> cozepy.OAuthToken: jwt_token = self._gen_jwt(self._public_key_id, self._private_key, 3600, session_name) url = f"{self._base_url}/api/permission/oauth2/account/{self.account_id}/token" headers = {"Authorization": f"Bearer {jwt_token}"} body = { "duration_seconds": ttl, "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "scope": scope.model_dump() if scope else None, } return self._requester.request("post", url, False, cozepy.OAuthToken, headers=headers, body=body) class CozeChat: def __init__(self, base_url: str, auth_token: Optional[str] = None, auth_app: Optional[JWTOAuthApp] = None): if not auth_token and not auth_app: raise ValueError("Either auth_token or auth_app must be provided.") self.thread = None self.thread_running = False self.last_token_fresh = 0 if auth_token: self.coze = Coze(auth=TokenAuth(auth_token), base_url=base_url) else: self.auth_app = auth_app oauth_token = auth_app.get_access_token(ttl=12*3600) self.last_token_fresh = time.time() self.coze = Coze(auth=JWTAuth(oauth_app=auth_app), base_url=base_url) self.setup_token_refresh() def create(self, bot_id: str, user_id: str, messages: List, custom_variables: Dict): response = self.coze.chat.create_and_poll( bot_id=bot_id, user_id=user_id, additional_messages=messages, custom_variables=custom_variables) logger.debug("Coze response size: {}".format(len(response.messages))) if response.chat.status != ChatStatus.COMPLETED: logger.error("Coze chat not completed: {}".format(response.chat.status)) return None final_response = None for message in response.messages: if message.type == MessageType.ANSWER: final_response = message.content return final_response def setup_token_refresh(self): self.thread = threading.Thread(target=self.refresh_token_loop) self.thread.start() self.thread_running = True def refresh_token_loop(self): while self.thread_running: if time.time() - self.last_token_fresh < 11*3600: time.sleep(1) continue if self.auth_app: self.auth_app.get_access_token(ttl=12*3600) self.last_token_fresh = time.time() def __del__(self): self.thread_running = False @staticmethod def get_oauth_app(client_id, private_key_path, public_key_id, base_url=None, account_id=None) -> JWTOAuthApp: if not base_url: base_url = COZE_CN_BASE_URL with open(private_key_path, "r") as f: private_key = f.read() if not account_id: jwt_oauth_app = JWTOAuthApp( client_id=str(client_id), private_key=private_key, public_key_id=public_key_id, base_url=base_url, ) else: jwt_oauth_app = CrossAccountJWTOAuthApp( account_id=account_id, client_id=str(client_id), private_key=private_key, public_key_id=public_key_id, base_url=base_url, ) return jwt_oauth_app if __name__ == '__main__': # Init the Coze client through the access_token. coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url=COZE_CN_BASE_URL) # Create a bot instance in Coze, copy the last number from the web link as the bot's ID. bot_id = "7491250992952999973" # The user id identifies the identity of a user. Developers can use a custom business ID # or a random string. user_id = "dev_user" chat = coze.chat.create_and_poll( bot_id=bot_id, user_id=user_id, additional_messages=[Message.build_user_question_text("钱塘江边 樱花开得不错,推荐一个视频吧")], custom_variables={ 'agent_name': '芳华', 'agent_age': '25', 'agent_region': '北京', 'name': '李明', 'preferred_nickname': '李叔', 'age': '70', 'last_interaction_interval': '12', 'current_time_period': '上午', 'if_first_interaction': 'False', 'if_active_greeting': 'False' } ) for message in chat.messages: print(message, flush=True) if chat.chat.status == ChatStatus.COMPLETED: print("token usage:", chat.chat.usage.token_count)