123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- #
- import os
- import threading
- from typing import List, Dict, Optional
- from enum import Enum, auto
- from logging_service import logger
- import cozepy
- from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageType, JWTOAuthApp, JWTAuth
- import time
- from openai import OpenAI
- COZE_API_TOKEN = os.getenv("COZE_API_TOKEN")
- COZE_CN_BASE_URL = 'https://api.coze.cn'
- VOLCENGINE_API_TOKEN = '5e275c38-44fd-415f-abcf-4b59f6377f72'
- VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
- VOLCENGINE_MODEL_DEEPSEEK_V3 = "deepseek-v3-250324"
- VOLCENGINE_MODEL_DOUBAO_PRO_1_5 = 'ep-20250307150409-4blz9'
- VOLCENGINE_MODEL_DOUBAO_PRO_32K = 'ep-20250414202859-6nkz5'
- VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO = 'ep-20250421193334-nz5wd'
- DEEPSEEK_API_TOKEN = 'sk-67daad8f424f4854bda7f1fed7ef220b'
- DEEPSEEK_BASE_URL = 'https://api.deepseek.com/'
- DEEPSEEK_CHAT_MODEL = 'deepseek-chat'
- class ChatServiceType(Enum):
- OPENAI_COMPATIBLE = auto()
- COZE_CHAT = auto()
- class OpenAICompatible:
- @staticmethod
- def create_client(model_name):
- volcengine_models = [
- VOLCENGINE_MODEL_DOUBAO_PRO_32K,
- VOLCENGINE_MODEL_DOUBAO_PRO_1_5,
- VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO,
- VOLCENGINE_MODEL_DEEPSEEK_V3
- ]
- deepseek_models = [
- DEEPSEEK_CHAT_MODEL,
- ]
- if model_name in volcengine_models:
- llm_client = OpenAI(api_key=VOLCENGINE_API_TOKEN, base_url=VOLCENGINE_BASE_URL)
- elif model_name in deepseek_models:
- llm_client = OpenAI(api_key=DEEPSEEK_API_TOKEN, base_url=DEEPSEEK_BASE_URL)
- else:
- raise Exception("Unsupported model: %s" % model_name)
- return llm_client
- class CrossAccountJWTOAuthApp(JWTOAuthApp):
- def __init__(self, account_id: str, client_id: str, private_key: str, public_key_id: str, base_url):
- self.account_id = account_id
- super().__init__(client_id, private_key, public_key_id, base_url)
- def get_access_token(
- self, ttl: int = 900, scope: Optional[cozepy.Scope] = None, session_name: Optional[str] = None
- ) -> cozepy.OAuthToken:
- jwt_token = self._gen_jwt(self._public_key_id, self._private_key, 3600, session_name)
- url = f"{self._base_url}/api/permission/oauth2/account/{self.account_id}/token"
- headers = {"Authorization": f"Bearer {jwt_token}"}
- body = {
- "duration_seconds": ttl,
- "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
- "scope": scope.model_dump() if scope else None,
- }
- return self._requester.request("post", url, False, cozepy.OAuthToken, headers=headers, body=body)
- class CozeChat:
- def __init__(self, base_url: str, auth_token: Optional[str] = None, auth_app: Optional[JWTOAuthApp] = None):
- if not auth_token and not auth_app:
- raise ValueError("Either auth_token or auth_app must be provided.")
- self.thread = None
- self.thread_running = False
- self.last_token_fresh = 0
- if auth_token:
- self.coze = Coze(auth=TokenAuth(auth_token), base_url=base_url)
- else:
- self.auth_app = auth_app
- oauth_token = auth_app.get_access_token(ttl=12*3600)
- self.last_token_fresh = time.time()
- self.coze = Coze(auth=JWTAuth(oauth_app=auth_app), base_url=base_url)
- self.setup_token_refresh()
- def create(self, bot_id: str, user_id: str, messages: List, custom_variables: Dict):
- response = self.coze.chat.create_and_poll(
- bot_id=bot_id, user_id=user_id, additional_messages=messages,
- custom_variables=custom_variables)
- logger.debug("Coze response size: {}".format(len(response.messages)))
- if response.chat.status != ChatStatus.COMPLETED:
- logger.error("Coze chat not completed: {}".format(response.chat.status))
- return None
- final_response = None
- for message in response.messages:
- if message.type == MessageType.ANSWER:
- final_response = message.content
- return final_response
- def setup_token_refresh(self):
- self.thread = threading.Thread(target=self.refresh_token_loop)
- self.thread.start()
- self.thread_running = True
- def refresh_token_loop(self):
- while self.thread_running:
- if time.time() - self.last_token_fresh < 11*3600:
- time.sleep(1)
- continue
- if self.auth_app:
- self.auth_app.get_access_token(ttl=12*3600)
- self.last_token_fresh = time.time()
- def __del__(self):
- self.thread_running = False
- @staticmethod
- def get_oauth_app(client_id, private_key_path, public_key_id, base_url=None, account_id=None) -> JWTOAuthApp:
- if not base_url:
- base_url = COZE_CN_BASE_URL
- with open(private_key_path, "r") as f:
- private_key = f.read()
- if not account_id:
- jwt_oauth_app = JWTOAuthApp(
- client_id=str(client_id),
- private_key=private_key,
- public_key_id=public_key_id,
- base_url=base_url,
- )
- else:
- jwt_oauth_app = CrossAccountJWTOAuthApp(
- account_id=account_id,
- client_id=str(client_id),
- private_key=private_key,
- public_key_id=public_key_id,
- base_url=base_url,
- )
- return jwt_oauth_app
- if __name__ == '__main__':
- # Init the Coze client through the access_token.
- coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url=COZE_CN_BASE_URL)
- # Create a bot instance in Coze, copy the last number from the web link as the bot's ID.
- bot_id = "7491250992952999973"
- # The user id identifies the identity of a user. Developers can use a custom business ID
- # or a random string.
- user_id = "dev_user"
- chat = coze.chat.create_and_poll(
- bot_id=bot_id,
- user_id=user_id,
- additional_messages=[Message.build_user_question_text("钱塘江边 樱花开得不错,推荐一个视频吧")],
- custom_variables={
- 'agent_name': '芳华',
- 'agent_age': '25',
- 'agent_region': '北京',
- 'name': '李明',
- 'preferred_nickname': '李叔',
- 'age': '70',
- 'last_interaction_interval': '12',
- 'current_time_period': '上午',
- 'if_first_interaction': 'False',
- 'if_active_greeting': 'False'
- }
- )
- for message in chat.messages:
- print(message, flush=True)
- if chat.chat.status == ChatStatus.COMPLETED:
- print("token usage:", chat.chat.usage.token_count)
|