| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- from typing import List, Optional, Dict, Any
- from datetime import datetime
- import json
- import logging
- from .connection import get_db_manager
- logger = logging.getLogger(__name__)
- class QueryTaskStatus:
- """查询任务状态常量"""
- PENDING = 0 # 待执行
- RUNNING = 1 # 执行中
- SUCCESS = 2 # 成功
- FAILED = 3 # 失败
- class KnowledgeSuggestQuery:
- """知识查询建议模型"""
-
- def __init__(self, task_id: int, question: str, querys: Optional[List[str]] = None, status: int = QueryTaskStatus.PENDING, knowledgeType: str = "", err_msg: str = "", need_store: int = 1):
- """
- 初始化查询任务
-
- Args:
- task_id: 任务ID
- question: 问题
- querys: 查询词列表
- status: 任务状态
- knowledgeType: 知识类型
- err_msg: 错误信息
- need_store: 是否存储查询词
- """
- self.task_id = task_id
- self.question = question
- self.querys = querys or []
- self.status = status
- self.knowledgeType = knowledgeType
- self.err_msg = err_msg or ""
- self.need_store = need_store
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return {
- 'task_id': self.task_id,
- 'question': self.question,
- 'querys': json.dumps(self.querys, ensure_ascii=False) if self.querys else None,
- 'status': self.status,
- 'knowledgeType': self.knowledgeType,
- 'err_msg': self.err_msg or None,
- 'need_store': self.need_store
- }
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'KnowledgeSuggestQuery':
- """从字典创建实例"""
- querys = None
- if data.get('querys'):
- try:
- querys = json.loads(data['querys'])
- except json.JSONDecodeError:
- querys = []
-
- return cls(
- task_id=data['task_id'],
- question=data['question'],
- querys=querys,
- status=data['status'],
- knowledgeType=data.get('knowledgeType', ""),
- err_msg=data.get('err_msg', ""),
- need_store=data.get('need_store', 1)
- )
- class QueryTaskDAO:
- """查询任务数据访问对象"""
-
- def __init__(self):
- self.db_manager = get_db_manager()
-
- def create_task(self, task_id: int, question: str, knowledge_type: str = "", need_store: int = 1) -> bool:
- """
- 创建新的查询任务
-
- Args:
- task_id: 任务ID
- question: 问题
-
- Returns:
- 是否创建成功
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- sql = """
- INSERT INTO knowledge_suggest_query (task_id, question, status, knowledgeType, err_msg, need_store)
- VALUES (%s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- question = VALUES(question),
- status = VALUES(status),
- querys = NULL,
- knowledgeType = VALUES(knowledgeType),
- err_msg = NULL,
- need_store = VALUES(need_store)
- """
- cursor.execute(sql, (task_id, question, QueryTaskStatus.PENDING, knowledge_type or "内容知识", None, need_store))
- return True
- except Exception as e:
- logger.error(f"创建任务失败: {e}")
- return False
-
- def update_task_status(self, task_id: int, status: int) -> bool:
- """
- 更新任务状态
-
- Args:
- task_id: 任务ID
- status: 新状态
-
- Returns:
- 是否更新成功
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- sql = "UPDATE knowledge_suggest_query SET status = %s WHERE task_id = %s"
- cursor.execute(sql, (status, task_id))
- return cursor.rowcount > 0
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- def mark_task_failed(self, task_id: int, err_msg: str) -> bool:
- """
- 将任务标记为失败并记录错误信息
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- try:
- sql = "UPDATE knowledge_suggest_query SET status = %s, err_msg = %s, knowledgeType = %s WHERE task_id = %s"
- cursor.execute(sql, (QueryTaskStatus.FAILED, err_msg, "内容知识", task_id))
- return cursor.rowcount > 0
- except Exception:
- # 回退到仅更新状态
- sql = "UPDATE knowledge_suggest_query SET status = %s WHERE task_id = %s"
- cursor.execute(sql, (QueryTaskStatus.FAILED, task_id))
- return cursor.rowcount > 0
- except Exception as e:
- logger.error(f"标记任务失败时出错: {e}")
- return False
-
- def update_task_results(self, task_id: int, querys: List[str], knowledge_type: str, query_type: str, status: int = QueryTaskStatus.SUCCESS) -> bool:
- """
- 更新任务结果
-
- Args:
- task_id: 任务ID
- querys: 查询词列表
- status: 任务状态
-
- Returns:
- 是否更新成功
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- sql = "UPDATE knowledge_suggest_query SET querys = %s, status = %s, knowledgeType = %s, query_type = %s WHERE task_id = %s"
- querys_json = json.dumps(querys, ensure_ascii=False)
- cursor.execute(sql, (querys_json, status, knowledge_type, query_type, task_id))
- return cursor.rowcount > 0
- except Exception as e:
- logger.error(f"更新任务结果失败: {e}")
- return False
-
- def get_task(self, task_id: int) -> Optional[KnowledgeSuggestQuery]:
- """
- 获取任务信息
-
- Args:
- task_id: 任务ID
-
- Returns:
- 任务对象,如果不存在返回None
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- sql = "SELECT * FROM knowledge_suggest_query WHERE task_id = %s"
- cursor.execute(sql, (task_id,))
- result = cursor.fetchone()
-
- if result:
- return KnowledgeSuggestQuery.from_dict(result)
- return None
- except Exception as e:
- logger.error(f"获取任务失败: {e}")
- return None
-
- def get_tasks_by_status(self, status: int, limit: int = 100) -> List[KnowledgeSuggestQuery]:
- """
- 根据状态获取任务列表
-
- Args:
- status: 任务状态
- limit: 限制数量
-
- Returns:
- 任务列表
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- sql = "SELECT * FROM knowledge_suggest_query WHERE status = %s ORDER BY task_id DESC LIMIT %s"
- cursor.execute(sql, (status, limit))
- results = cursor.fetchall()
-
- return [KnowledgeSuggestQuery.from_dict(row) for row in results]
- except Exception as e:
- logger.error(f"获取任务列表失败: {e}")
- return []
-
- def delete_task(self, task_id: int) -> bool:
- """
- 删除任务
-
- Args:
- task_id: 任务ID
-
- Returns:
- 是否删除成功
- """
- try:
- with self.db_manager.get_cursor() as cursor:
- sql = "DELETE FROM knowledge_suggest_query WHERE task_id = %s"
- cursor.execute(sql, (task_id,))
- return cursor.rowcount > 0
- except Exception as e:
- logger.error(f"删除任务失败: {e}")
- return False
- # 全局DAO实例
- query_task_dao = None
- def get_query_task_dao() -> QueryTaskDAO:
- """获取QueryTaskDAO实例"""
- global query_task_dao
- if query_task_dao is None:
- query_task_dao = QueryTaskDAO()
- return query_task_dao
|