record_pattern.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import json
  2. import traceback
  3. import uuid
  4. import logging
  5. from typing import Dict, List, Tuple, Optional
  6. from dataclasses import dataclass
  7. from applications.utils.mysql import Patterns
  8. # 配置日志
  9. logger = logging.getLogger(__name__)
  10. @dataclass
  11. class ModeData:
  12. """模式数据类"""
  13. id: str
  14. name: str
  15. percentage: str
  16. description: str
  17. detail: str
  18. output_id: str
  19. dimension_name: str
  20. @dataclass
  21. class OutputData:
  22. """产物数据类"""
  23. id: str
  24. type: str
  25. description: str
  26. content: str
  27. constraints: str
  28. class RecordPattern:
  29. def __init__(self, resource):
  30. self.pattern_manager = Patterns(resource.mysql_client)
  31. self.milvus_client = resource.milvus_client
  32. async def record_mode(self, mode: ModeData) -> bool:
  33. """记录模式数据"""
  34. try:
  35. mode_tuple = (
  36. mode.id,
  37. mode.name,
  38. mode.percentage,
  39. mode.description,
  40. json.dumps(mode.detail, ensure_ascii=False),
  41. mode.output_id,
  42. mode.dimension_name,
  43. )
  44. result = await self.pattern_manager.insert_modes([mode_tuple])
  45. logger.info(f"成功记录模式: {mode.name}")
  46. return bool(result)
  47. except Exception as e:
  48. logger.error(f"记录模式失败: {mode.name}, 错误: {e}")
  49. print(traceback.format_exc())
  50. return False
  51. async def record_output(self, output: OutputData) -> bool:
  52. """记录产物数据"""
  53. try:
  54. output_tuple = (
  55. output.id,
  56. output.type,
  57. output.description,
  58. output.content,
  59. output.constraints,
  60. )
  61. result = await self.pattern_manager.insert_outputs([output_tuple])
  62. logger.info(f"成功记录产物: {output.type}")
  63. return bool(result)
  64. except Exception as e:
  65. logger.error(f"记录产物失败: {output.type}, 错误: {e}")
  66. return False
  67. @staticmethod
  68. def _validate_pattern_data(pattern: Dict) -> bool:
  69. """验证模式数据格式"""
  70. required_keys = ["维度模式分析"]
  71. if not all(key in pattern for key in required_keys):
  72. logger.error(f"模式数据缺少必要字段: {required_keys}")
  73. return False
  74. dims = pattern["维度模式分析"]
  75. if not isinstance(dims, list):
  76. logger.error("维度模式分析必须是列表类型")
  77. return False
  78. for dim in dims:
  79. if not all(key in dim for key in ["维度名称", "模式列表"]):
  80. logger.error("维度数据缺少必要字段")
  81. return False
  82. for method in dim["模式列表"]:
  83. required_method_keys = [
  84. "模式命名",
  85. "模式占比",
  86. "模式说明",
  87. "分析详情",
  88. "可复用产物",
  89. ]
  90. if not all(key in method for key in required_method_keys):
  91. logger.error(f"模式数据缺少必要字段: {required_method_keys}")
  92. return False
  93. output = method["可复用产物"]
  94. required_output_keys = ["产物类型", "产物描述", "产物内容", "变量约束"]
  95. if not all(key in output for key in required_output_keys):
  96. logger.error(f"产物数据缺少必要字段: {required_output_keys}")
  97. return False
  98. return True
  99. @staticmethod
  100. def _extract_mode_data(method: Dict, dim_name: str) -> Optional[ModeData]:
  101. """提取模式数据"""
  102. try:
  103. mode_id = f"mode-{uuid.uuid4()}"
  104. output_id = f"output-{uuid.uuid4()}"
  105. return ModeData(
  106. id=mode_id,
  107. name=method.get("模式命名", ""),
  108. percentage=method.get("模式占比", ""),
  109. description=method.get("模式说明", ""),
  110. detail=method.get("分析详情", ""),
  111. output_id=output_id,
  112. dimension_name=dim_name,
  113. )
  114. except Exception as e:
  115. logger.error(f"提取模式数据失败: {e}")
  116. return None
  117. @staticmethod
  118. def _extract_output_data(output: Dict, output_id: str) -> Optional[OutputData]:
  119. """提取产物数据"""
  120. try:
  121. return OutputData(
  122. id=output_id,
  123. type=output.get("产物类型", ""),
  124. description=output.get("产物描述", ""),
  125. content=json.dumps(output.get("产物内容", {}), ensure_ascii=False),
  126. constraints=json.dumps(output.get("变量约束", {}), ensure_ascii=False),
  127. )
  128. except Exception as e:
  129. logger.error(f"提取产物数据失败: {e}")
  130. return None
  131. async def deal(self, pattern: Dict) -> bool:
  132. """
  133. 处理模式数据
  134. Args:
  135. pattern: 模式数据字典
  136. Returns:
  137. bool: 处理是否成功
  138. """
  139. # 验证数据格式
  140. if not self._validate_pattern_data(pattern):
  141. logger.error("模式数据格式验证失败")
  142. return False
  143. dims = pattern["维度模式分析"]
  144. success_count = 0
  145. total_count = 0
  146. for dim in dims:
  147. dim_name = dim["维度名称"]
  148. for method in dim["模式列表"]:
  149. total_count += 1
  150. # 提取模式数据
  151. mode_data = self._extract_mode_data(method, dim_name)
  152. if not mode_data:
  153. logger.error(f"提取模式数据失败: {dim_name}")
  154. continue
  155. # 提取产物数据
  156. output = method["可复用产物"]
  157. output_data = self._extract_output_data(output, mode_data.output_id)
  158. print(output_data)
  159. if not output_data:
  160. logger.error(f"提取产物数据失败: {mode_data.name}")
  161. continue
  162. # 记录数据
  163. mode_success = await self.record_mode(mode_data)
  164. output_success = await self.record_output(output_data)
  165. if mode_success and output_success:
  166. success_count += 1
  167. logger.info(f"成功处理模式: {mode_data.name}")
  168. else:
  169. logger.error(f"处理模式失败: {mode_data.name}")
  170. success_rate = (success_count / total_count) * 100 if total_count > 0 else 0
  171. logger.info(
  172. f"模式处理完成: 成功 {success_count}/{total_count} ({success_rate:.1f}%)"
  173. )
  174. return success_count > 0