#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import time from typing import List, Tuple, Optional, Dict import requests from utils.logging_config import get_logger from utils.mysql_db import MysqlHelper logger = get_logger('StoreAgent') CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk" DATASET_ID = 14 SCORE_THRESHOLD = 70 def _update_store_status(request_id: str, status: int) -> None: try: sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s" MysqlHelper.update_values(sql, (status, request_id)) logger.info(f"更新store状态成功: requestId={request_id}, status={status}") except Exception as e: logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}") def _fetch_query(request_id: str) -> str: sql = "SELECT query FROM knowledge_request WHERE request_id = %s" rows = MysqlHelper.get_values(sql, (request_id,)) if rows and len(rows[0]) > 0: return rows[0][0] or "" return "" def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]: sql = ( "SELECT parsing_id, data FROM knowledge_extraction_content " "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s" ) rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD)) if not rows: return [] return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows] def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]: sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s" rows = MysqlHelper.get_values(sql, (parsing_id,)) if rows and len(rows[0]) > 0: return rows[0][0] return None def _fetch_channel_by_content_id(content_id: str) -> Optional[str]: sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1" rows = MysqlHelper.get_values(sql, (content_id,)) if rows and len(rows[0]) > 0: return rows[0][0] return None def _upload_chunk(text: str, query: str, channel: str = "", max_retries: int = 3, backoff_sec: float = 1.0) -> bool: # ext 需要是字符串 JSON payload = { "dataset_id": DATASET_ID, "title": "", "text": text, "ext": json.dumps({"query": query, "channel": channel or ""}, ensure_ascii=False), } headers = {"Content-Type": "application/json; charset=utf-8"} for attempt in range(max_retries): try: # 以 GET 方法发送,body 为 JSON 字符串 body = json.dumps(payload, ensure_ascii=False).encode('utf-8') resp = requests.get(CHUNK_API_URL, headers=headers, data=body, timeout=30) logger.info(f"上传chunk请求体: payload={payload}") try: logger.info(f"上传chunk成功: resp={resp.json()}") except Exception: logger.info(f"上传chunk返回非JSON: text={resp.text[:500]}") if resp.status_code >= 200 and resp.status_code < 300: return True logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试") except Exception as e: logger.warning(f"上传异常: {e}, 第{attempt+1}次重试") time.sleep(backoff_sec * (2 ** attempt)) return False def execute_store_agent(request_id: str) -> Tuple[int, int]: """ 执行存储流程: 1) 更新 store_status = 1 2) 读取 query 和符合条件的 data 列表 3) 逐条上传到外部接口 4) 全部成功 -> store_status = 2;否则 -> store_status = 3 返回: (total, success) """ _update_store_status(request_id, 1) try: query = _fetch_query(request_id) data_list = _fetch_extraction_data(request_id) total = len(data_list) success = 0 if total == 0: # 没有可上传数据,按失败处理 _update_store_status(request_id, 3) logger.info(f"无可上传数据: requestId={request_id}") return (0, 0) for item in data_list: text = item.get("data", "") parsing_id = item.get("parsing_id") channel = "" try: if parsing_id: content_id = _fetch_content_id_by_parsing_id(parsing_id) if content_id: channel_value = _fetch_channel_by_content_id(content_id) channel = channel_value or "" except Exception as e: logger.warning(f"获取channel失败: parsing_id={parsing_id}, error={e}") ok = _upload_chunk(text, query, channel) success += 1 if ok else 0 if success == total: _update_store_status(request_id, 2) else: _update_store_status(request_id, 3) logger.info(f"store完成: requestId={request_id}, total={total}, success={success}") return (total, success) except Exception as e: logger.error(f"store流程异常: requestId={request_id}, error={e}") _update_store_status(request_id, 3) return (0, 0)