#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 import logging from calendar import prmonth import werkzeug.exceptions from flask import Flask, request, jsonify from datetime import datetime, timedelta from argparse import ArgumentParser from openai import OpenAI from message import MessageType import chat_service import configs import json import logging_service import prompt_templates from dialogue_manager import DialogueManager from history_dialogue_service import HistoryDialogueService from response_type_detector import ResponseTypeDetector from user_manager import MySQLUserManager, MySQLUserRelationManager from user_profile_extractor import UserProfileExtractor app = Flask('agent_api_server') logger = logging_service.logger def compose_openai_chat_messages_no_time(dialogue_history, multimodal=False): messages = [] for entry in dialogue_history: role = entry['role'] msg_type = entry.get('type', MessageType.TEXT) fmt_time = DialogueManager.format_timestamp(entry['timestamp']) if msg_type in (MessageType.IMAGE_GW, MessageType.IMAGE_QW, MessageType.GIF): if multimodal: messages.append({ "role": role, "content": [ {"type": "image_url", "image_url": {"url": entry["content"]}} ] }) else: logger.warning("Image in non-multimodal mode") messages.append({ "role": role, "content": "[图片]" }) else: messages.append({ "role": role, "content": f'{entry["content"]}' }) return messages def wrap_response(code, msg=None, data=None): resp = { 'code': code, 'msg': msg } if code == 200 and not msg: resp['msg'] = 'success' if data: resp['data'] = data return jsonify(resp) @app.route('/api/listStaffs', methods=['GET']) def list_staffs(): staff_data = app.user_relation_manager.list_staffs() return wrap_response(200, data=staff_data) @app.route('/api/getStaffProfile', methods=['GET']) def get_staff_profile(): staff_id = request.args['staff_id'] profile = app.user_manager.get_staff_profile(staff_id) if not profile: return wrap_response(404, msg='staff not found') else: return wrap_response(200, data=profile) @app.route('/api/getUserProfile', methods=['GET']) def get_user_profile(): user_id = request.args['user_id'] profile = app.user_manager.get_user_profile(user_id) if not profile: resp = { 'code': 404, 'msg': 'user not found' } else: resp = { 'code': 200, 'msg': 'success', 'data': profile } return jsonify(resp) @app.route('/api/listUsers', methods=['GET']) def list_users(): user_name = request.args.get('user_name', None) user_union_id = request.args.get('user_union_id', None) if not user_name and not user_union_id: resp = { 'code': 400, 'msg': 'user_name or user_union_id is required' } return jsonify(resp) data = app.user_manager.list_users(user_name=user_name, user_union_id=user_union_id) return jsonify({'code': 200, 'data': data}) @app.route('/api/getDialogueHistory', methods=['GET']) def get_dialogue_history(): staff_id = request.args['staff_id'] user_id = request.args['user_id'] recent_minutes = int(request.args.get('recent_minutes', 1440)) dialogue_history = app.history_dialogue_service.get_dialogue_history(staff_id, user_id, recent_minutes) return jsonify({'code': 200, 'data': dialogue_history}) @app.route('/api/listModels', methods=['GET']) def list_models(): models = [ { 'model_type': 'openai_compatible', 'model_name': chat_service.VOLCENGINE_MODEL_DEEPSEEK_V3, 'display_name': 'DeepSeek V3 on 火山' }, { 'model_type': 'openai_compatible', 'model_name': chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_32K, 'display_name': '豆包Pro 32K' }, { 'model_type': 'openai_compatible', 'model_name': chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_1_5, 'display_name': '豆包Pro 1.5' }, { 'model_type': 'openai_compatible', 'model_name': chat_service.VOLCENGINE_BOT_DEEPSEEK_V3_SEARCH, 'display_name': 'DeepSeek V3联网 on 火山' }, ] return wrap_response(200, data=models) @app.route('/api/listScenes', methods=['GET']) def list_scenes(): scenes = [ {'scene': 'greeting', 'display_name': '问候'}, {'scene': 'chitchat', 'display_name': '闲聊'}, {'scene': 'profile_extractor', 'display_name': '画像提取'}, {'scene': 'response_type_detector', 'display_name': '回复模态判断'}, {'scene': 'custom_debugging', 'display_name': '自定义调试场景'} ] return wrap_response(200, data=scenes) @app.route('/api/getBasePrompt', methods=['GET']) def get_base_prompt(): scene = request.args['scene'] prompt_map = { 'greeting': prompt_templates.GENERAL_GREETING_PROMPT, 'chitchat': prompt_templates.CHITCHAT_PROMPT_COZE, 'profile_extractor': prompt_templates.USER_PROFILE_EXTRACT_PROMPT, 'response_type_detector': prompt_templates.RESPONSE_TYPE_DETECT_PROMPT, 'custom_debugging': '', } model_map = { 'greeting': chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_32K, 'chitchat': chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_32K, 'profile_extractor': chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_1_5, 'response_type_detector': chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_1_5, 'custom_debugging': chat_service.VOLCENGINE_BOT_DEEPSEEK_V3_SEARCH } if scene not in prompt_map: return wrap_response(404, msg='scene not found') data = { 'model_name': model_map[scene], 'content': prompt_map[scene] } return wrap_response(200, data=data) def run_openai_chat(messages, model_name, **kwargs): volcengine_models = [ chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_32K, chat_service.VOLCENGINE_MODEL_DOUBAO_PRO_1_5, chat_service.VOLCENGINE_MODEL_DEEPSEEK_V3 ] deepseek_models = [ chat_service.DEEPSEEK_CHAT_MODEL, ] volcengine_bots = [ chat_service.VOLCENGINE_BOT_DEEPSEEK_V3_SEARCH, ] if model_name in volcengine_models: llm_client = OpenAI(api_key=chat_service.VOLCENGINE_API_TOKEN, base_url=chat_service.VOLCENGINE_BASE_URL) elif model_name in volcengine_bots: llm_client = OpenAI(api_key=chat_service.VOLCENGINE_API_TOKEN, base_url=chat_service.VOLCENGINE_BOT_BASE_URL) elif model_name in deepseek_models: llm_client = OpenAI(api_key=chat_service.DEEPSEEK_API_TOKEN, base_url=chat_service.DEEPSEEK_BASE_URL) else: raise Exception('model not supported') response = llm_client.chat.completions.create( messages=messages, model=model_name, **kwargs) return response def run_extractor_prompt(req_data): prompt = req_data['prompt'] user_profile = req_data['user_profile'] staff_profile = req_data['staff_profile'] dialogue_history = req_data['dialogue_history'] model_name = req_data['model_name'] prompt_context = {**staff_profile, **user_profile, 'dialogue_history': UserProfileExtractor.compose_dialogue(dialogue_history)} prompt = prompt.format(**prompt_context) messages = [ {"role": "system", "content": '你是一个专业的用户画像分析助手。'}, {"role": "user", "content": prompt} ] tools = [UserProfileExtractor.get_extraction_function()] response = run_openai_chat(messages, model_name, tools=tools, temperature=0) tool_calls = response.choices[0].message.tool_calls if tool_calls: function_call = tool_calls[0] if function_call.function.name == 'update_user_profile': profile_info = json.loads(function_call.function.arguments) return {k: v for k, v in profile_info.items() if v} else: logger.error("llm does not return update_user_profile") return {} else: return {} def run_chat_prompt(req_data): prompt = req_data['prompt'] staff_profile = req_data.get('staff_profile', {}) user_profile = req_data.get('user_profile', {}) dialogue_history = req_data.get('dialogue_history', []) model_name = req_data['model_name'] current_timestamp = req_data['current_timestamp'] / 1000 prompt_context = {**staff_profile, **user_profile} current_hour = datetime.fromtimestamp(current_timestamp).hour prompt_context['last_interaction_interval'] = 0 prompt_context['current_time_period'] = DialogueManager.get_time_context(current_hour) prompt_context['current_hour'] = current_hour prompt_context['if_first_interaction'] = False if dialogue_history else True last_message = dialogue_history[-1] if dialogue_history else {'role': 'assistant'} prompt_context['if_active_greeting'] = False if last_message['role'] == 'user' else True current_time_str = datetime.fromtimestamp(current_timestamp).strftime('%Y-%m-%d %H:%M:%S') system_prompt = { 'role': 'system', 'content': prompt.format(**prompt_context) } messages = [system_prompt] if req_data['scene'] == 'custom_debugging': messages.extend(compose_openai_chat_messages_no_time(dialogue_history)) else: messages.extend(DialogueManager.compose_chat_messages_openai_compatible(dialogue_history, current_time_str)) return run_openai_chat(messages, model_name, temperature=1, top_p=0.7, max_tokens=1024) def run_response_type_prompt(req_data): prompt = req_data['prompt'] dialogue_history = req_data['dialogue_history'] model_name = req_data['model_name'] composed_dialogue = ResponseTypeDetector.compose_dialogue(dialogue_history[:-1]) next_message = DialogueManager.format_dialogue_content(dialogue_history[-1]) prompt = prompt.format( dialogue_history=composed_dialogue, message=next_message ) messages = [ {'role': 'system', 'content': '你是一个专业的智能助手'}, {'role': 'user', 'content': prompt} ] return run_openai_chat(messages, model_name,temperature=0.2, max_tokens=128) @app.route('/api/runPrompt', methods=['POST']) def run_prompt(): try: req_data = request.json logger.debug(req_data) scene = req_data['scene'] if scene == 'profile_extractor': response = run_extractor_prompt(req_data) return wrap_response(200, data=response) elif scene == 'response_type_detector': response = run_response_type_prompt(req_data) return wrap_response(200, data=response.choices[0].message.content) else: response = run_chat_prompt(req_data) return wrap_response(200, data=response.choices[0].message.content) except Exception as e: logger.error(e) return wrap_response(500, msg='Error: {}'.format(e)) @app.errorhandler(werkzeug.exceptions.BadRequest) def handle_bad_request(e): logger.error(e) return wrap_response(400, msg='Bad Request: {}'.format(e.description)) if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('--prod', action='store_true') parser.add_argument('--host', default='127.0.0.1') parser.add_argument('--port', type=int, default=8083) parser.add_argument('--log-level', default='INFO') args = parser.parse_args() config = configs.get() logging_level = logging.getLevelName(args.log_level) logging_service.setup_root_logger(level=logging_level, logfile_name='agent_api_server.log') user_db_config = config['storage']['user'] staff_db_config = config['storage']['staff'] user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'], staff_db_config['table']) app.user_manager = user_manager wecom_db_config = config['storage']['user_relation'] user_relation_manager = MySQLUserRelationManager( user_db_config['mysql'], wecom_db_config['mysql'], config['storage']['staff']['table'], user_db_config['table'], wecom_db_config['table']['staff'], wecom_db_config['table']['relation'], wecom_db_config['table']['user'] ) app.user_relation_manager = user_relation_manager app.history_dialogue_service = HistoryDialogueService( config['storage']['history_dialogue']['api_base_url'] ) app.run(debug=not args.prod, host=args.host, port=args.port)