import json from applications.config import Chunk from .base import BaseMySQLClient class ContentChunks(BaseMySQLClient): async def insert_chunk(self, chunk: Chunk) -> int: query = """ INSERT IGNORE INTO content_chunks (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=( chunk.chunk_id, chunk.doc_id, chunk.text, chunk.tokens, chunk.topic_purity, chunk.text_type, chunk.dataset_id, chunk.status, ), ) # 修改单个 chunk 的分块状态 async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET chunk_status = %s WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS), ) # 修改单个 chunk 的 embedding 状态 async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET embedding_status = %s WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) # 设置分块结果,并且将分块状态设置为成功 async def set_chunk_result(self, chunk: Chunk, ori_status, new_status): query = """ UPDATE content_chunks SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s, keywords = %s, questions = %s, chunk_status = %s, entities = %s WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s; """ return await self.pool.async_save( query=query, params=( chunk.summary, chunk.topic, chunk.domain, chunk.task_type, json.dumps(chunk.concepts), json.dumps(chunk.keywords), json.dumps(chunk.questions), new_status, json.dumps(chunk.entities), chunk.doc_id, chunk.chunk_id, ori_status, ), ) # 修改添加至 es 的状态 async def update_es_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET es_status = %s WHERE doc_id = %s AND chunk_id = %s AND es_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) # 修改单个 chunk 的可用状态 async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks set status = %s WHERE doc_id = %s AND chunk_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) # 修改单个 doc 的可用状态 async def update_doc_status(self, doc_id, ori_status, new_status): query = """ UPDATE content_chunks set status = %s WHERE doc_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, ori_status) ) # 修改 dataset 的可用状态 async def update_dataset_status(self, dataset_id, ori_status, new_status): query = """ UPDATE content_chunks set status = %s WHERE dataset_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, dataset_id, ori_status) ) # 修改建立图谱状态 async def update_graph_status(self, doc_id, chunk_id, ori_status, new_status): query = """ UPDATE content_chunks SET graph_status = %s WHERE doc_id = %s AND chunk_id = %s AND graph_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, chunk_id, ori_status) ) async def select_chunk_content(self, doc_id, chunk_id): query = """ SELECT * FROM content_chunks WHERE doc_id = %s AND chunk_id = %s; """ return await self.pool.async_fetch(query=query, params=(doc_id, chunk_id)) async def select_chunk_contents( self, page_num: int, page_size: int, order_by=None, doc_id: str = None, doc_status: int = None, ): if order_by is None: order_by = {"chunk_id": "asc"} offset = (page_num - 1) * page_size # 动态拼接 where 条件 where_clauses = [] params = [] if doc_id: where_clauses.append("doc_id = %s") params.append(doc_id) if doc_status: where_clauses.append("doc_status = %s") params.append(doc_status) where_sql = " AND ".join(where_clauses) # 动态拼接 order by order_field, order_direction = list(order_by.items())[0] order_sql = f"ORDER BY {order_field} {order_direction.upper()}" # 查询总数 count_query = ( f"SELECT COUNT(*) as total_count FROM content_chunks WHERE {where_sql};" ) count_result = await self.pool.async_fetch( query=count_query, params=tuple(params) ) total_count = count_result[0]["total_count"] if count_result else 0 # 查询分页数据 query = f""" SELECT * FROM content_chunks WHERE {where_sql} {order_sql} LIMIT %s OFFSET %s; """ params.extend([page_size, offset]) entities = await self.pool.async_fetch(query=query, params=tuple(params)) total_pages = (total_count + page_size - 1) // page_size # 向上取整 print(total_pages) return { "entities": entities, "total_count": total_count, "page": page_num, "page_size": page_size, "total_pages": total_pages, }