from .base import BaseMySQLClient class Contents(BaseMySQLClient): async def insert_content(self, doc_id, text, text_type, title, dataset_id): query = """ INSERT IGNORE INTO contents (doc_id, text, text_type, title, dataset_id) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(doc_id, text, text_type, title, dataset_id) ) async def update_content_info(self, doc_id, text, text_type, title, dataset_id): query = """ UPDATE contents SET text = %s, text_type = %s, title = %s, dataset_id = %s, status = %s WHERE doc_id = %s; """ return await self.pool.async_save( query=query, params=(text, text_type, title, dataset_id, self.INIT_STATUS, doc_id), ) async def update_content_status(self, doc_id, ori_status, new_status): query = """ UPDATE contents SET status = %s WHERE doc_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, ori_status) ) async def update_dataset_status(self, dataset_id, ori_status, new_status): query = """ UPDATE contents SET status = %s WHERE dataset_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, dataset_id, ori_status) ) async def update_doc_status(self, doc_id, ori_status, new_status): """ this function is to change the using status of each document :param doc_id: :param ori_status: :param new_status: :return: """ query = """ UPDATE contents SET doc_status = %s WHERE doc_id = %s AND doc_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, doc_id, ori_status) ) async def select_count(self, dataset_id, doc_status=1): query = """ SELECT count(*) AS count FROM contents WHERE dataset_id = %s AND doc_status = %s; """ rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status)) return rows[0]["count"] if rows else 0 async def select_content_by_doc_id(self, doc_id): query = """SELECT * FROM contents WHERE doc_id = %s;""" return await self.pool.async_fetch(query=query, params=(doc_id,)) async def select_contents( self, page_num: int, page_size: int, order_by=None, dataset_id: int = None, doc_status: int = 1, ): """ 分页查询 contents 表,并返回分页信息 :param page_num: 页码,从 1 开始 :param page_size: 每页数量 :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"} :param dataset_id: 数据集 ID :param doc_status: 文档状态(默认 1) :return: dict,包含 entities、total_count、page、page_size、total_pages """ if order_by is None: order_by = {"id": "desc"} offset = (page_num - 1) * page_size # 动态拼接 where 条件 where_clauses = ["doc_status = %s"] params = [doc_status] if dataset_id: where_clauses.append("dataset_id = %s") params.append(dataset_id) where_sql = " AND ".join(where_clauses) # 动态拼接 order by order_field, order_direction = list(order_by.items())[0] order_sql = f"ORDER BY {order_field} {order_direction.upper()}" # 查询总数 count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};" count_result = await self.pool.async_fetch( query=count_query, params=tuple(params) ) total_count = count_result[0]["total_count"] if count_result else 0 # 查询分页数据 query = f""" SELECT * FROM contents WHERE {where_sql} {order_sql} LIMIT %s OFFSET %s; """ params.extend([page_size, offset]) entities = await self.pool.async_fetch(query=query, params=tuple(params)) total_pages = (total_count + page_size - 1) // page_size # 向上取整 return { "entities": entities, "total_count": total_count, "page": page_num, "page_size": page_size, "total_pages": total_pages, }