""" use neo4j to build graph """ from dataclasses import fields from applications.utils.neo4j import AsyncNeo4jRepository from applications.utils.neo4j import Document, GraphChunk, ChunkRelations from applications.utils.mysql import ContentChunks from applications.utils.async_utils import run_tasks_with_asyncio_task_group class BuildGraph(AsyncNeo4jRepository): INIT_STATUS = 0 PROCESSING_STATUS = 1 FINISHED_STATUS = 2 FAILED_STATUS = 3 def __init__(self, neo4j, es_client, mysql_client): super().__init__(neo4j) self.es_client = es_client self.chunk_manager = ContentChunks(mysql_client) @staticmethod def from_dict(cls, data: dict): field_names = {f.name for f in fields(cls)} return cls(**{k: v for k, v in data.items() if k in field_names}) async def add_single_chunk(self, param): """async process single chunk""" chunk_id = param["chunk_id"] doc_id = param["doc_id"] acquire_lock = await self.chunk_manager.update_graph_status( doc_id, chunk_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if acquire_lock: print(f"while building graph, acquire lock for chunk {chunk_id}") return try: doc: Document = self.from_dict(Document, param) graph_chunk: GraphChunk = self.from_dict(GraphChunk, param) relations: ChunkRelations = self.from_dict(ChunkRelations, param) await self.add_document_with_chunk(doc, graph_chunk, relations) await self.chunk_manager.update_graph_status( doc_id, chunk_id, self.PROCESSING_STATUS, self.FINISHED_STATUS ) except Exception as e: print(f"failed to build graph for chunk {chunk_id}: {e}") await self.chunk_manager.update_graph_status( doc_id, chunk_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) async def get_chunk_list_from_es(self, doc_id): """async get chunk list""" query = { "query": {"bool": {"must": [{"term": {"doc_id": doc_id}}]}}, "_source": True, } try: resp = await self.es_client.async_search(query=query) return [hit["_source"] for hit in resp["hits"]["hits"]] except Exception as e: print(f"search failed: {e}") return [] async def deal(self, doc_id): """async process single chunk""" chunk_list = await self.get_chunk_list_from_es(doc_id) await run_tasks_with_asyncio_task_group( task_list=chunk_list, handler=self.add_single_chunk, description="build graph", unit="chunk", max_concurrency=10, )