12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- """
- use neo4j to build graph
- """
- from dataclasses import fields
- from applications.utils.neo4j import AsyncNeo4jRepository
- from applications.utils.neo4j import Document, GraphChunk, ChunkRelations
- from applications.utils.mysql import ContentChunks
- from applications.utils.async_utils import run_tasks_with_asyncio_task_group
- class BuildGraph(AsyncNeo4jRepository):
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- FINISHED_STATUS = 2
- FAILED_STATUS = 3
- def __init__(self, neo4j, es_client, mysql_client):
- super().__init__(neo4j)
- self.es_client = es_client
- self.chunk_manager = ContentChunks(mysql_client)
- @staticmethod
- def from_dict(cls, data: dict):
- field_names = {f.name for f in fields(cls)}
- return cls(**{k: v for k, v in data.items() if k in field_names})
- async def add_single_chunk(self, param):
- """async process single chunk"""
- chunk_id = param["chunk_id"]
- doc_id = param["doc_id"]
- acquire_lock = await self.chunk_manager.update_graph_status(
- doc_id, chunk_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- print(f"while building graph, acquire lock for chunk {chunk_id}")
- return
- try:
- doc: Document = self.from_dict(Document, param)
- graph_chunk: GraphChunk = self.from_dict(GraphChunk, param)
- relations: ChunkRelations = self.from_dict(ChunkRelations, param)
- await self.add_document_with_chunk(doc, graph_chunk, relations)
- await self.chunk_manager.update_graph_status(
- doc_id, chunk_id, self.PROCESSING_STATUS, self.FINISHED_STATUS
- )
- except Exception as e:
- print(f"failed to build graph for chunk {chunk_id}: {e}")
- await self.chunk_manager.update_graph_status(
- doc_id, chunk_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- async def get_chunk_list_from_es(self, doc_id):
- """async get chunk list"""
- query = {
- "query": {"bool": {"must": [{"term": {"doc_id": doc_id}}]}},
- "_source": True,
- }
- try:
- resp = await self.es_client.async_search(query=query)
- return [hit["_source"] for hit in resp["hits"]["hits"]]
- except Exception as e:
- print(f"search failed: {e}")
- return []
- async def deal(self, doc_id):
- """async process single chunk"""
- chunk_list = await self.get_chunk_list_from_es(doc_id)
- for chunk in chunk_list:
- await self.add_single_chunk(chunk)
- # await run_tasks_with_asyncio_task_group(
- # task_list=chunk_list,
- # handler=self.add_single_chunk,
- # description="build graph",
- # unit="chunk",
- # max_concurrency=10,
- # )
|