record_pattern.py 6.7 KB


  1. import json
  2. import traceback
  3. import uuid
  4. import logging
  5. from typing import Dict, List, Tuple, Optional
  6. from dataclasses import dataclass
  7. from applications.utils.mysql import Patterns
  8. # 配置日志
  9. logger = logging.getLogger(__name__)
  10. @dataclass
  11. class ModeData:
  12. """模式数据类"""
  13. id: str
  14. name: str
  15. percentage: str
  16. description: str
  17. detail: str
  18. output_id: str
  19. dimension_name: str
  20. @dataclass
  21. class OutputData:
  22. """产物数据类"""
  23. id: str
  24. type: str
  25. description: str
  26. content: str
  27. constraints: str
  28. class RecordPattern:
  29. def __init__(self, resource):
  30. self.pattern_manager = Patterns(resource.mysql_client)
  31. self.milvus_client = resource.milvus_client
  32. async def record_mode(self, mode: ModeData) -> bool:
  33. """记录模式数据"""
  34. try:
  35. mode_tuple = (
  36. mode.id,
  37. mode.name,
  38. mode.percentage,
  39. mode.description,
  40. json.dumps(mode.detail, ensure_ascii=False),
  41. mode.output_id,
  42. mode.dimension_name,
  43. )
  44. result = await self.pattern_manager.insert_modes([mode_tuple])
  45. logger.info(f"成功记录模式: {mode.name}")
  46. return bool(result)
  47. except Exception as e:
  48. logger.error(f"记录模式失败: {mode.name}, 错误: {e}")
  49. print(traceback.format_exc())
  50. return False
  51. async def record_output(self, output: OutputData) -> bool:
  52. """记录产物数据"""
  53. try:
  54. output_tuple = (
  55. output.id,
  56. output.type,
  57. output.description,
  58. output.content,
  59. output.constraints,
  60. )
  61. result = await self.pattern_manager.insert_outputs([output_tuple])
  62. logger.info(f"成功记录产物: {output.type}")
  63. return bool(result)
  64. except Exception as e:
  65. logger.error(f"记录产物失败: {output.type}, 错误: {e}")
  66. return False
  67. @staticmethod
  68. def _validate_pattern_data(pattern: Dict) -> bool:
  69. """验证模式数据格式"""
  70. required_keys = ["维度模式分析"]
  71. if not all(key in pattern for key in required_keys):
  72. logger.error(f"模式数据缺少必要字段: {required_keys}")
  73. return False
  74. dims = pattern["维度模式分析"]
  75. if not isinstance(dims, list):
  76. logger.error("维度模式分析必须是列表类型")
  77. return False
  78. for dim in dims:
  79. if not all(key in dim for key in ["维度名称", "模式列表"]):
  80. logger.error("维度数据缺少必要字段")
  81. return False
  82. for method in dim["模式列表"]:
  83. required_method_keys = [
  84. "模式命名",
  85. "模式占比",
  86. "模式说明",
  87. "分析详情",
  88. "可复用产物",
  89. ]
  90. if not all(key in method for key in required_method_keys):
  91. logger.error(f"模式数据缺少必要字段: {required_method_keys}")
  92. return False
  93. output = method["可复用产物"]
  94. required_output_keys = ["产物类型", "产物描述", "产物内容", "变量约束"]
  95. if not all(key in output for key in required_output_keys):
  96. logger.error(f"产物数据缺少必要字段: {required_output_keys}")
  97. return False
  98. return True
  99. @staticmethod
  100. def _extract_mode_data(method: Dict, dim_name: str) -> Optional[ModeData]:
  101. """提取模式数据"""
  102. try:
  103. mode_id = f"mode-{uuid.uuid4()}"
  104. output_id = f"output-{uuid.uuid4()}"
  105. return ModeData(
  106. id=mode_id,
  107. name=method.get("模式命名", ""),
  108. percentage=method.get("模式占比", ""),
  109. description=method.get("模式说明", ""),
  110. detail=method.get("分析详情", ""),
  111. output_id=output_id,
  112. dimension_name=dim_name,
  113. )
  114. except Exception as e:
  115. logger.error(f"提取模式数据失败: {e}")
  116. return None
  117. @staticmethod
  118. def _extract_output_data(output: Dict, output_id: str) -> Optional[OutputData]:
  119. """提取产物数据"""
  120. try:
  121. return OutputData(
  122. id=output_id,
  123. type=output.get("产物类型", ""),
  124. description=output.get("产物描述", ""),
  125. content=json.dumps(output.get("产物内容", {}), ensure_ascii=False),
  126. constraints=json.dumps(output.get("变量约束", {}), ensure_ascii=False),
  127. )
  128. except Exception as e:
  129. logger.error(f"提取产物数据失败: {e}")
  130. return None
  131. async def deal(self, pattern: Dict) -> bool:
  132. """
  133. 处理模式数据
  134. Args:
  135. pattern: 模式数据字典
  136. Returns:
  137. bool: 处理是否成功
  138. """
  139. # 验证数据格式
  140. if not self._validate_pattern_data(pattern):
  141. logger.error("模式数据格式验证失败")
  142. return False
  143. dims = pattern["维度模式分析"]
  144. success_count = 0
  145. total_count = 0
  146. for dim in dims:
  147. dim_name = dim["维度名称"]
  148. for method in dim["模式列表"]:
  149. total_count += 1
  150. # 提取模式数据
  151. mode_data = self._extract_mode_data(method, dim_name)
  152. if not mode_data:
  153. logger.error(f"提取模式数据失败: {dim_name}")
  154. continue
  155. # 提取产物数据
  156. output = method["可复用产物"]
  157. output_data = self._extract_output_data(output, mode_data.output_id)
  158. print(output_data)
  159. if not output_data:
  160. logger.error(f"提取产物数据失败: {mode_data.name}")
  161. continue
  162. # 记录数据
  163. mode_success = await self.record_mode(mode_data)
  164. output_success = await self.record_output(output_data)
  165. if mode_success and output_success:
  166. success_count += 1
  167. logger.info(f"成功处理模式: {mode_data.name}")
  168. else:
  169. logger.error(f"处理模式失败: {mode_data.name}")
  170. success_rate = (success_count / total_count) * 100 if total_count > 0 else 0
  171. logger.info(
  172. f"模式处理完成: 成功 {success_count}/{total_count} ({success_rate:.1f}%)"
  173. )
  174. return success_count > 0