agent.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import json
  4. import time
  5. from typing import List, Tuple, Optional, Dict
  6. import requests
  7. from utils.logging_config import get_logger
  8. from utils.mysql_db import MysqlHelper
  9. logger = get_logger('StoreAgent')
  10. CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
  11. DATASET_ID = 14
  12. SCORE_THRESHOLD = 70
  13. def _update_store_status(request_id: str, status: int) -> None:
  14. try:
  15. sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
  16. MysqlHelper.update_values(sql, (status, request_id))
  17. logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
  18. except Exception as e:
  19. logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
  20. def _fetch_query(request_id: str) -> str:
  21. sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
  22. rows = MysqlHelper.get_values(sql, (request_id,))
  23. if rows and len(rows[0]) > 0:
  24. return rows[0][0] or ""
  25. return ""
  26. def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]:
  27. sql = (
  28. "SELECT parsing_id, data FROM knowledge_extraction_content "
  29. "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
  30. )
  31. rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
  32. if not rows:
  33. return []
  34. return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows]
  35. def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]:
  36. sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s"
  37. rows = MysqlHelper.get_values(sql, (parsing_id,))
  38. if rows and len(rows[0]) > 0:
  39. return rows[0][0]
  40. return None
  41. def _fetch_channel_by_content_id(content_id: str) -> Optional[str]:
  42. sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1"
  43. rows = MysqlHelper.get_values(sql, (content_id,))
  44. if rows and len(rows[0]) > 0:
  45. return rows[0][0]
  46. return None
  47. def _upload_chunk(text: str, query: str, channel: str = "", max_retries: int = 3, backoff_sec: float = 1.0) -> bool:
  48. payload = {
  49. "dataset_id": DATASET_ID,
  50. "title": "",
  51. "text": text,
  52. "ext": {"query": query, "channel": channel or ""},
  53. }
  54. headers = {"Content-Type": "application/json"}
  55. for attempt in range(max_retries):
  56. try:
  57. resp = requests.get(CHUNK_API_URL, headers=headers, data=payload, timeout=30)
  58. # 打印出请求体
  59. logger.info(f"上传chunk请求体: payload={payload}")
  60. logger.info(f"上传chunk成功: resp={resp.json()}")
  61. if resp.status_code >= 200 and resp.status_code < 300:
  62. return True
  63. logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
  64. except Exception as e:
  65. logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
  66. time.sleep(backoff_sec * (2 ** attempt))
  67. return False
  68. def execute_store_agent(request_id: str) -> Tuple[int, int]:
  69. """
  70. 执行存储流程:
  71. 1) 更新 store_status = 1
  72. 2) 读取 query 和符合条件的 data 列表
  73. 3) 逐条上传到外部接口
  74. 4) 全部成功 -> store_status = 2;否则 -> store_status = 3
  75. 返回: (total, success)
  76. """
  77. _update_store_status(request_id, 1)
  78. try:
  79. query = _fetch_query(request_id)
  80. data_list = _fetch_extraction_data(request_id)
  81. total = len(data_list)
  82. success = 0
  83. if total == 0:
  84. # 没有可上传数据,按失败处理
  85. _update_store_status(request_id, 3)
  86. logger.info(f"无可上传数据: requestId={request_id}")
  87. return (0, 0)
  88. for item in data_list:
  89. text = item.get("data", "")
  90. parsing_id = item.get("parsing_id")
  91. channel = ""
  92. try:
  93. if parsing_id:
  94. content_id = _fetch_content_id_by_parsing_id(parsing_id)
  95. if content_id:
  96. channel_value = _fetch_channel_by_content_id(content_id)
  97. channel = channel_value or ""
  98. except Exception as e:
  99. logger.warning(f"获取channel失败: parsing_id={parsing_id}, error={e}")
  100. ok = _upload_chunk(text, query, channel)
  101. success += 1 if ok else 0
  102. if success == total:
  103. _update_store_status(request_id, 2)
  104. else:
  105. _update_store_status(request_id, 3)
  106. logger.info(f"store完成: requestId={request_id}, total={total}, success={success}")
  107. return (total, success)
  108. except Exception as e:
  109. logger.error(f"store流程异常: requestId={request_id}, error={e}")
  110. _update_store_status(request_id, 3)
  111. return (0, 0)