""" use neo4j to build graph """ from dataclasses import fields from applications.utils.neo4j import AsyncNeo4jRepository from applications.utils.neo4j import Document, GraphChunk, ChunkRelations from applications.utils.mysql import ContentChunks class BuildGraph(AsyncNeo4jRepository): INIT_STATUS = 0 PROCESSING_STATUS = 1 FINISHED_STATUS = 2 FAILED_STATUS = 3 def __init__(self, neo4j, es_client, mysql_client): super().__init__(neo4j) self.es_client = es_client self.chunk_manager = ContentChunks(mysql_client) @staticmethod def from_dict(cls, data: dict): field_names = {f.name for f in fields(cls)} return cls(**{k: v for k, v in data.items() if k in field_names}) async def add_single_chunk(self, param): """async process single chunk""" chunk_id = param["chunk_id"] doc_id = param["doc_id"] acquire_lock = await self.chunk_manager.update_graph_status( doc_id, chunk_id, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: print(f"while building graph, acquire lock for chunk {chunk_id}") return try: doc: Document = self.from_dict(Document, param) graph_chunk: GraphChunk = self.from_dict(GraphChunk, param) relations: ChunkRelations = self.from_dict(ChunkRelations, param) await self.add_document_with_chunk(doc, graph_chunk, relations) await self.chunk_manager.update_graph_status( doc_id, chunk_id, self.PROCESSING_STATUS, self.FINISHED_STATUS ) except Exception as e: print(f"failed to build graph for chunk {chunk_id}: {e}") await self.chunk_manager.update_graph_status( doc_id, chunk_id, self.PROCESSING_STATUS, self.FAILED_STATUS ) async def get_chunk_list_from_es(self, doc_id): """async get chunk list""" query = { "query": {"bool": {"must": [{"term": {"doc_id": doc_id}}]}}, "_source": True, } try: resp = await self.es_client.async_search(query=query) return [hit["_source"] for hit in resp["hits"]["hits"]] except Exception as e: print(f"search failed: {e}") return [] async def deal(self, doc_id): """async process single chunk""" chunk_list = await self.get_chunk_list_from_es(doc_id) for chunk in chunk_list: await self.add_single_chunk(chunk) async def deal_batch(self, dataset_id): """async task""" doc_ids = await self.chunk_manager.get_ungraphed_docs(dataset_id) for doc_id in doc_ids: try: await self.deal(doc_id) except Exception as e: print(f"failed to build graph for doc {doc_id}: {e}")