import json from applications.config import Chunk class TaskConst: INIT_STATUS = 0 PROCESSING_STATUS = 1 FINISHED_STATUS = 2 FAILED_STATUS = 3 class BaseMySQLClient: def __init__(self, pool): self.pool = pool class Dataset(BaseMySQLClient): async def update_dataset_status(self, dataset_id, ori_status, new_status): query = """ UPDATE dataset set status = %s where id = %s and status = %s; """ return await self.pool.async_save( query=query, params=(new_status, dataset_id, ori_status), ) class Contents(BaseMySQLClient): async def insert_content(self, doc_id, text, text_type, title, dataset_id): query = """ INSERT IGNORE INTO contents (doc_id, text, text_type, title, dataset_id) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(doc_id, text, text_type, title, dataset_id) ) async def update_content_status(self, doc_id, ori_status, new_status): query = """ UPDATE contents SET status = %s WHERE doc_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, ori_status) ) async def update_dataset_status(self, dataset_id, ori_status, new_status): query = """ UPDATE contents SET status = %s WHERE dataset_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, dataset_id, ori_status) ) async def update_doc_status(self, doc_id, ori_status, new_status): """ this function is to change the using status of each document :param doc_id: :param ori_status: :param new_status: :return: """ query = """ UPDATE contents SET doc_status = %s WHERE doc_id = %s and doc_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, ori_status) ) class ContentChunks(BaseMySQLClient): async def insert_chunk(self, chunk: Chunk) -> int: query = """ INSERT IGNORE INTO content_chunks (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id) VALUES (%s, %s, %s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=( chunk.chunk_id, chunk.doc_id, chunk.text, chunk.tokens, chunk.topic_purity, chunk.text_type, chunk.dataset_id, ), ) async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET chunk_status = %s WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET embedding_status = %s WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) async def set_chunk_result(self, chunk: Chunk, ori_status, new_status): query = """ UPDATE content_chunks SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s, keywords = %s, questions = %s, chunk_status = %s, entities = %s WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s; """ return await self.pool.async_save( query=query, params=( chunk.summary, chunk.topic, chunk.domain, chunk.task_type, json.dumps(chunk.concepts), json.dumps(chunk.keywords), json.dumps(chunk.questions), new_status, json.dumps(chunk.entities), chunk.doc_id, chunk.chunk_id, ori_status, ), ) async def update_es_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET es_status = %s WHERE doc_id = %s AND chunk_id = %s AND es_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks set status = %s WHERE doc_id = %s AND chunk_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) async def update_doc_status(self, doc_id, ori_status, new_status): query = """ UPDATE content_chunks set status = %s WHERE doc_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, ori_status) ) async def update_dataset_status(self, dataset_id, ori_status, new_status): query = """ UPDATE content_chunks set status = %s WHERE dataset_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, dataset_id, ori_status) )