| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 | #! /usr/bin/env python# -*- coding: utf-8 -*-# vim:fenc=utf-8#import osimport threadingfrom typing import List, Dict, Optionalfrom enum import Enum, autofrom logging_service import loggerimport cozepyfrom cozepy import Coze, TokenAuth, Message, ChatStatus, MessageType, JWTOAuthApp, JWTAuthimport timeCOZE_API_TOKEN = os.getenv("COZE_API_TOKEN")COZE_CN_BASE_URL = 'https://api.coze.cn'VOLCENGINE_API_TOKEN = '5e275c38-44fd-415f-abcf-4b59f6377f72'VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"VOLCENGINE_MODEL_DEEPSEEK_V3 = "ep-20250213194558-rrmr2"VOLCENGINE_MODEL_DOUBAO_PRO_1_5 = 'ep-20250307150409-4blz9'DEEPSEEK_API_TOKEN = 'sk-67daad8f424f4854bda7f1fed7ef220b'DEEPSEEK_BASE_URL = 'https://api.deepseek.com/'DEEPSEEK_CHAT_MODEL = 'deepseek-chat'class ChatServiceType(Enum):    OPENAI_COMPATIBLE = auto    COZE_CHAT = auto()class CrossAccountJWTOAuthApp(JWTOAuthApp):    def __init__(self, account_id: str, client_id: str, private_key: str, public_key_id: str, base_url):        self.account_id = account_id        super().__init__(client_id, private_key, public_key_id, base_url)    def get_access_token(            self, ttl: int = 900, scope: Optional[cozepy.Scope] = None, session_name: Optional[str] = None    ) -> cozepy.OAuthToken:        jwt_token = self._gen_jwt(self._public_key_id, self._private_key, 3600, session_name)        url = f"{self._base_url}/api/permission/oauth2/account/{self.account_id}/token"        headers = {"Authorization": f"Bearer {jwt_token}"}        body = {            "duration_seconds": ttl,            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",            "scope": scope.model_dump() if scope else None,        }        return self._requester.request("post", url, False, cozepy.OAuthToken, headers=headers, body=body)class CozeChat:    def __init__(self, base_url: str, auth_token: Optional[str] = None, auth_app: Optional[JWTOAuthApp] = None):        if not auth_token and not auth_app:            raise ValueError("Either auth_token or auth_app must be provided.")        self.thread = None        self.thread_running = False        self.last_token_fresh = 0        if auth_token:            self.coze = Coze(auth=TokenAuth(auth_token), base_url=base_url)        else:            self.auth_app = auth_app            oauth_token = auth_app.get_access_token(ttl=12*3600)            self.last_token_fresh = time.time()            self.coze = Coze(auth=JWTAuth(oauth_app=auth_app), base_url=base_url)            self.setup_token_refresh()    def create(self, bot_id: str, user_id: str, messages: List, custom_variables: Dict):        response = self.coze.chat.create_and_poll(            bot_id=bot_id, user_id=user_id, additional_messages=messages,            custom_variables=custom_variables)        logger.debug("Coze response size: {}".format(len(response.messages)))        if response.chat.status != ChatStatus.COMPLETED:            logger.error("Coze chat not completed: {}".format(response.chat.status))            return None        final_response = None        for message in response.messages:            if message.type == MessageType.ANSWER:                final_response = message.content        return final_response    def setup_token_refresh(self):        self.thread = threading.Thread(target=self.refresh_token_loop)        self.thread.start()        self.thread_running = True    def refresh_token_loop(self):        while self.thread_running:            if time.time() - self.last_token_fresh < 11*3600:                time.sleep(1)                continue            if self.auth_app:                self.auth_app.get_access_token(ttl=12*3600)                self.last_token_fresh = time.time()    def __del__(self):        self.thread_running = False    @staticmethod    def get_oauth_app(client_id, private_key_path, public_key_id, base_url=None, account_id=None) -> JWTOAuthApp:        if not base_url:            base_url = COZE_CN_BASE_URL        with open(private_key_path, "r") as f:            private_key = f.read()        if not account_id:            jwt_oauth_app = JWTOAuthApp(                client_id=str(client_id),                private_key=private_key,                public_key_id=public_key_id,                base_url=base_url,            )        else:            jwt_oauth_app = CrossAccountJWTOAuthApp(                account_id=account_id,                client_id=str(client_id),                private_key=private_key,                public_key_id=public_key_id,                base_url=base_url,            )        return jwt_oauth_appif __name__ == '__main__':    # Init the Coze client through the access_token.    coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url=COZE_CN_BASE_URL)    # Create a bot instance in Coze, copy the last number from the web link as the bot's ID.    bot_id = "7491250992952999973"    # The user id identifies the identity of a user. Developers can use a custom business ID    # or a random string.    user_id = "dev_user"    chat = coze.chat.create_and_poll(        bot_id=bot_id,        user_id=user_id,        additional_messages=[Message.build_user_question_text("钱塘江边 樱花开得不错,推荐一个视频吧")],        custom_variables={            'agent_name': '芳华',            'agent_age': '25',            'agent_region': '北京',            'name': '李明',            'preferred_nickname': '李叔',            'age': '70',            'last_interaction_interval': '12',            'current_time_period': '上午',            'if_first_interaction': 'False',            'if_active_greeting': 'False'        }    )    for message in chat.messages:        print(message, flush=True)    if chat.chat.status == ChatStatus.COMPLETED:        print("token usage:", chat.chat.usage.token_count)
 |