agent.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import json
  4. import time
  5. from typing import List, Tuple
  6. import requests
  7. from utils.logging_config import get_logger
  8. from utils.mysql_db import MysqlHelper
  9. logger = get_logger('StoreAgent')
  10. CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
  11. DATASET_ID = 14
  12. SCORE_THRESHOLD = 70
  13. def _update_store_status(request_id: str, status: int) -> None:
  14. try:
  15. sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
  16. MysqlHelper.update_values(sql, (status, request_id))
  17. logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
  18. except Exception as e:
  19. logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
  20. def _fetch_query(request_id: str) -> str:
  21. sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
  22. rows = MysqlHelper.get_values(sql, (request_id,))
  23. if rows and len(rows[0]) > 0:
  24. return rows[0][0] or ""
  25. return ""
  26. def _fetch_extraction_data(request_id: str) -> List[str]:
  27. sql = (
  28. "SELECT data FROM knowledge_extraction_content "
  29. "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
  30. )
  31. rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
  32. return [row[0] for row in rows] if rows else []
  33. def _upload_chunk(text: str, query: str, max_retries: int = 3, backoff_sec: float = 1.0) -> bool:
  34. payload = {
  35. "dataset_id": DATASET_ID,
  36. "title": "",
  37. "text": text,
  38. "ext": {"query": query, "channel": ""},
  39. }
  40. headers = {"Content-Type": "application/json"}
  41. for attempt in range(max_retries):
  42. try:
  43. resp = requests.post(CHUNK_API_URL, headers=headers, json=payload, timeout=15)
  44. if resp.status_code >= 200 and resp.status_code < 300:
  45. return True
  46. logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
  47. except Exception as e:
  48. logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
  49. time.sleep(backoff_sec * (2 ** attempt))
  50. return False
  51. def execute_store_agent(request_id: str) -> Tuple[int, int]:
  52. """
  53. 执行存储流程:
  54. 1) 更新 store_status = 1
  55. 2) 读取 query 和符合条件的 data 列表
  56. 3) 逐条上传到外部接口
  57. 4) 全部成功 -> store_status = 2;否则 -> store_status = 3
  58. 返回: (total, success)
  59. """
  60. _update_store_status(request_id, 1)
  61. try:
  62. query = _fetch_query(request_id)
  63. data_list = _fetch_extraction_data(request_id)
  64. total = len(data_list)
  65. success = 0
  66. if total == 0:
  67. # 没有可上传数据,按失败处理
  68. _update_store_status(request_id, 3)
  69. logger.info(f"无可上传数据: requestId={request_id}")
  70. return (0, 0)
  71. for text in data_list:
  72. ok = _upload_chunk(text, query)
  73. success += 1 if ok else 0
  74. if success == total:
  75. _update_store_status(request_id, 2)
  76. else:
  77. _update_store_status(request_id, 3)
  78. logger.info(f"store完成: requestId={request_id}, total={total}, success={success}")
  79. return (total, success)
  80. except Exception as e:
  81. logger.error(f"store流程异常: requestId={request_id}, error={e}")
  82. _update_store_status(request_id, 3)
  83. return (0, 0)