|
@@ -4,10 +4,12 @@
|
|
#
|
|
#
|
|
|
|
|
|
import os
|
|
import os
|
|
-from typing import List, Dict
|
|
|
|
|
|
+import threading
|
|
|
|
+from typing import List, Dict, Optional
|
|
from enum import Enum, auto
|
|
from enum import Enum, auto
|
|
import logging
|
|
import logging
|
|
-from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageContentType, ChatEventType, MessageType
|
|
|
|
|
|
+from cozepy import Coze, TokenAuth, Message, ChatStatus, MessageType, JWTOAuthApp, JWTAuth
|
|
|
|
+import time
|
|
|
|
|
|
COZE_API_TOKEN = os.getenv("COZE_API_TOKEN")
|
|
COZE_API_TOKEN = os.getenv("COZE_API_TOKEN")
|
|
COZE_CN_BASE_URL = 'https://api.coze.cn'
|
|
COZE_CN_BASE_URL = 'https://api.coze.cn'
|
|
@@ -21,8 +23,16 @@ class ChatServiceType(Enum):
|
|
COZE_CHAT = auto()
|
|
COZE_CHAT = auto()
|
|
|
|
|
|
class CozeChat:
|
|
class CozeChat:
|
|
- def __init__(self, token, base_url: str):
|
|
|
|
- self.coze = Coze(auth=TokenAuth(token), base_url=base_url)
|
|
|
|
|
|
+ def __init__(self, base_url: str, auth_token: Optional[str] = None, auth_app: Optional[JWTOAuthApp] = None):
|
|
|
|
+ if not auth_token and not auth_app:
|
|
|
|
+ raise ValueError("Either auth_token or auth_app must be provided.")
|
|
|
|
+ if auth_token:
|
|
|
|
+ self.coze = Coze(auth=TokenAuth(auth_token), base_url=base_url)
|
|
|
|
+ else:
|
|
|
|
+ self.auth_app = auth_app
|
|
|
|
+ oauth_token = auth_app.get_access_token(ttl=12*3600)
|
|
|
|
+ self.coze = Coze(auth=JWTAuth(oauth_app=auth_app), base_url=base_url)
|
|
|
|
+ self.setup_token_refresh()
|
|
|
|
|
|
def create(self, bot_id: str, user_id: str, messages: List, custom_variables: Dict):
|
|
def create(self, bot_id: str, user_id: str, messages: List, custom_variables: Dict):
|
|
response = self.coze.chat.create_and_poll(
|
|
response = self.coze.chat.create_and_poll(
|
|
@@ -34,6 +44,30 @@ class CozeChat:
|
|
return message.content
|
|
return message.content
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
+ def setup_token_refresh(self):
|
|
|
|
+ thread = threading.Thread(target=self.refresh_token_loop)
|
|
|
|
+ thread.start()
|
|
|
|
+
|
|
|
|
+ def refresh_token_loop(self):
|
|
|
|
+ while True:
|
|
|
|
+ time.sleep(11*3600)
|
|
|
|
+ if self.auth_app:
|
|
|
|
+ self.auth_app.get_access_token(ttl=12*3600)
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def get_oauth_app(client_id, private_key_path, public_key_id, base_url=None) -> JWTOAuthApp:
|
|
|
|
+ if not base_url:
|
|
|
|
+ base_url = COZE_CN_BASE_URL
|
|
|
|
+ with open(private_key_path, "r") as f:
|
|
|
|
+ private_key = f.read()
|
|
|
|
+ jwt_oauth_app = JWTOAuthApp(
|
|
|
|
+ client_id=str(client_id),
|
|
|
|
+ private_key=private_key,
|
|
|
|
+ public_key_id=public_key_id,
|
|
|
|
+ base_url=base_url,
|
|
|
|
+ )
|
|
|
|
+ return jwt_oauth_app
|
|
|
|
+
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
# Init the Coze client through the access_token.
|
|
# Init the Coze client through the access_token.
|
|
coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url=COZE_CN_BASE_URL)
|
|
coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url=COZE_CN_BASE_URL)
|