12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- """
- use neo4j to build graph
- """
- import json
- import random
- from dataclasses import fields
- from applications.utils.neo4j import AsyncNeo4jRepository
- from applications.utils.neo4j import Document, GraphChunk, ChunkRelations
- from applications.utils.async_utils import run_tasks_with_asyncio_task_group
- class BuildGraph(AsyncNeo4jRepository):
- def __init__(self, neo4j, mysql_client):
- super().__init__(neo4j)
- self.mysql_client = mysql_client
- @staticmethod
- def from_dict(cls, data: dict):
- field_names = {f.name for f in fields(cls)}
- return cls(**{k: v for k, v in data.items() if k in field_names})
- async def add_single_chunk(self, param):
- """async process single chunk"""
- param["milvus_id"] = random.randint(100000, 999999)
- doc: Document = self.from_dict(Document, param)
- graph_chunk: GraphChunk = self.from_dict(GraphChunk, param)
- relations: ChunkRelations = self.from_dict(ChunkRelations, param)
- await self.add_document_with_chunk(doc, graph_chunk, relations)
- async def get_chunk_list(self, doc_id):
- """async get chunk list"""
- query = """
- SELECT chunk_id, doc_id, topic, domain, task_type, keywords, concepts, entities, text_type, dataset_id
- FROM content_chunks
- WHERE embedding_status = %s AND status = %s and doc_id = %s;
- """
- response = await self.mysql_client.async_fetch(
- query=query,
- params=(2, 1, doc_id)
- )
- L = []
- for i in response:
- i["keywords"] = json.loads(i["keywords"])
- i["entities"] = json.loads(i["entities"])
- i["concepts"] = json.loads(i["concepts"])
- L.append(i)
- return L
- async def deal(self, doc_id):
- for task in await self.get_chunk_list(doc_id):
- await self.add_single_chunk(task)
|