123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- from .base import BaseMySQLClient
- class Contents(BaseMySQLClient):
- async def insert_content(self, doc_id, text, text_type, title, dataset_id):
- query = """
- INSERT IGNORE INTO contents
- (doc_id, text, text_type, title, dataset_id)
- VALUES (%s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query, params=(doc_id, text, text_type, title, dataset_id)
- )
- async def update_content_info(self, doc_id, text, text_type, title, dataset_id):
- query = """
- UPDATE contents
- SET text = %s, text_type = %s, title = %s, dataset_id = %s, status = %s
- WHERE doc_id = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(text, text_type, title, dataset_id, self.INIT_STATUS, doc_id),
- )
- async def update_content_status(self, doc_id, ori_status, new_status):
- query = """
- UPDATE contents
- SET status = %s
- WHERE doc_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, ori_status)
- )
- async def update_dataset_status(self, dataset_id, ori_status, new_status):
- query = """
- UPDATE contents
- SET status = %s
- WHERE dataset_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, dataset_id, ori_status)
- )
- async def update_doc_status(self, doc_id, ori_status, new_status):
- """
- this function is to change the using status of each document
- :param doc_id:
- :param ori_status:
- :param new_status:
- :return:
- """
- query = """
- UPDATE contents SET doc_status = %s WHERE doc_id = %s AND doc_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, ori_status)
- )
- async def select_count(self, dataset_id, doc_status=1):
- query = """
- SELECT count(*) AS count FROM contents WHERE dataset_id = %s AND doc_status = %s;
- """
- rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status))
- return rows[0]["count"] if rows else 0
- async def select_content_by_doc_id(self, doc_id):
- query = """SELECT * FROM contents WHERE doc_id = %s;"""
- return await self.pool.async_fetch(query=query, params=(doc_id,))
- async def select_contents(
- self,
- page_num: int,
- page_size: int,
- order_by=None,
- dataset_id: int = None,
- doc_status: int = 1,
- ):
- """
- 分页查询 contents 表,并返回分页信息
- :param page_num: 页码,从 1 开始
- :param page_size: 每页数量
- :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"}
- :param dataset_id: 数据集 ID
- :param doc_status: 文档状态(默认 1)
- :return: dict,包含 entities、total_count、page、page_size、total_pages
- """
- if order_by is None:
- order_by = {"id": "desc"}
- offset = (page_num - 1) * page_size
- # 动态拼接 where 条件
- where_clauses = ["doc_status = %s"]
- params = [doc_status]
- if dataset_id:
- where_clauses.append("dataset_id = %s")
- params.append(dataset_id)
- where_sql = " AND ".join(where_clauses)
- # 动态拼接 order by
- order_field, order_direction = list(order_by.items())[0]
- order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
- # 查询总数
- count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};"
- count_result = await self.pool.async_fetch(
- query=count_query, params=tuple(params)
- )
- total_count = count_result[0]["total_count"] if count_result else 0
- # 查询分页数据
- query = f"""
- SELECT * FROM contents
- WHERE {where_sql}
- {order_sql}
- LIMIT %s OFFSET %s;
- """
- params.extend([page_size, offset])
- entities = await self.pool.async_fetch(query=query, params=tuple(params))
- total_pages = (total_count + page_size - 1) // page_size # 向上取整
- return {
- "entities": entities,
- "total_count": total_count,
- "page": page_num,
- "page_size": page_size,
- "total_pages": total_pages,
- }
|