""" use neo4j to build graph """ import json import random from dataclasses import fields from applications.utils.neo4j import AsyncNeo4jRepository from applications.utils.neo4j import Document, GraphChunk, ChunkRelations from applications.utils.async_utils import run_tasks_with_asyncio_task_group class BuildGraph(AsyncNeo4jRepository): def __init__(self, neo4j, mysql_client): super().__init__(neo4j) self.mysql_client = mysql_client @staticmethod def from_dict(cls, data: dict): field_names = {f.name for f in fields(cls)} return cls(**{k: v for k, v in data.items() if k in field_names}) async def add_single_chunk(self, param): """async process single chunk""" param["milvus_id"] = random.randint(100000, 999999) doc: Document = self.from_dict(Document, param) graph_chunk: GraphChunk = self.from_dict(GraphChunk, param) relations: ChunkRelations = self.from_dict(ChunkRelations, param) await self.add_document_with_chunk(doc, graph_chunk, relations) async def get_chunk_list(self, doc_id): """async get chunk list""" query = """ SELECT chunk_id, doc_id, topic, domain, task_type, keywords, concepts, entities, text_type, dataset_id FROM content_chunks WHERE embedding_status = %s AND status = %s and doc_id = %s; """ response = await self.mysql_client.async_fetch( query=query, params=(2, 1, doc_id) ) L = [] for i in response: i["keywords"] = json.loads(i["keywords"]) i["entities"] = json.loads(i["entities"]) i["concepts"] = json.loads(i["concepts"]) L.append(i) return L async def deal(self, doc_id): for task in await self.get_chunk_list(doc_id): await self.add_single_chunk(task)