123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import json
- import time
- from typing import List, Tuple, Optional, Dict
- import requests
- from utils.logging_config import get_logger
- from utils.mysql_db import MysqlHelper
- logger = get_logger('StoreAgent')
- CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
- DATASET_ID = 14
- SCORE_THRESHOLD = 70
- def _update_store_status(request_id: str, status: int) -> None:
- try:
- sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
- MysqlHelper.update_values(sql, (status, request_id))
- logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
- except Exception as e:
- logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
- def _fetch_query(request_id: str) -> str:
- sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
- rows = MysqlHelper.get_values(sql, (request_id,))
- if rows and len(rows[0]) > 0:
- return rows[0][0] or ""
- return ""
- def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]:
- sql = (
- "SELECT parsing_id, data FROM knowledge_extraction_content "
- "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
- )
- rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
- if not rows:
- return []
- return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows]
- def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]:
- sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s"
- rows = MysqlHelper.get_values(sql, (parsing_id,))
- if rows and len(rows[0]) > 0:
- return rows[0][0]
- return None
- def _fetch_channel_by_content_id(content_id: str) -> Optional[str]:
- sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1"
- rows = MysqlHelper.get_values(sql, (content_id,))
- if rows and len(rows[0]) > 0:
- return rows[0][0]
- return None
- def _upload_chunk(text: str, query: str, channel: str = "", max_retries: int = 3, backoff_sec: float = 1.0) -> bool:
- # ext 需要是字符串 JSON
- payload = {
- "dataset_id": DATASET_ID,
- "title": "",
- "text": text,
- "ext": json.dumps({"query": query, "channel": channel or ""}, ensure_ascii=False),
- }
- headers = {"Content-Type": "application/json"}
- for attempt in range(max_retries):
- try:
- # 以 GET 方法发送,body 为 JSON 字符串
- body = json.dumps(payload, ensure_ascii=False)
- resp = requests.get(CHUNK_API_URL, headers=headers, data=body, timeout=30)
- logger.info(f"上传chunk请求体: payload={payload}")
- try:
- logger.info(f"上传chunk成功: resp={resp.json()}")
- except Exception:
- logger.info(f"上传chunk返回非JSON: text={resp.text[:500]}")
- if resp.status_code >= 200 and resp.status_code < 300:
- return True
- logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
- except Exception as e:
- logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
- time.sleep(backoff_sec * (2 ** attempt))
- return False
- def execute_store_agent(request_id: str) -> Tuple[int, int]:
- """
- 执行存储流程:
- 1) 更新 store_status = 1
- 2) 读取 query 和符合条件的 data 列表
- 3) 逐条上传到外部接口
- 4) 全部成功 -> store_status = 2;否则 -> store_status = 3
- 返回: (total, success)
- """
- _update_store_status(request_id, 1)
- try:
- query = _fetch_query(request_id)
- data_list = _fetch_extraction_data(request_id)
- total = len(data_list)
- success = 0
- if total == 0:
- # 没有可上传数据,按失败处理
- _update_store_status(request_id, 3)
- logger.info(f"无可上传数据: requestId={request_id}")
- return (0, 0)
- for item in data_list:
- text = item.get("data", "")
- parsing_id = item.get("parsing_id")
- channel = ""
- try:
- if parsing_id:
- content_id = _fetch_content_id_by_parsing_id(parsing_id)
- if content_id:
- channel_value = _fetch_channel_by_content_id(content_id)
- channel = channel_value or ""
- except Exception as e:
- logger.warning(f"获取channel失败: parsing_id={parsing_id}, error={e}")
- ok = _upload_chunk(text, query, channel)
- success += 1 if ok else 0
- if success == total:
- _update_store_status(request_id, 2)
- else:
- _update_store_status(request_id, 3)
- logger.info(f"store完成: requestId={request_id}, total={total}, success={success}")
- return (total, success)
- except Exception as e:
- logger.error(f"store流程异常: requestId={request_id}, error={e}")
- _update_store_status(request_id, 3)
- return (0, 0)
|