123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- import json
- from applications.config import Chunk
- from .base import BaseMySQLClient
- class ContentChunks(BaseMySQLClient):
- async def insert_chunk(self, chunk: Chunk) -> int:
- query = """
- INSERT IGNORE INTO content_chunks
- (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=(
- chunk.chunk_id,
- chunk.doc_id,
- chunk.text,
- chunk.tokens,
- chunk.topic_purity,
- chunk.text_type,
- chunk.dataset_id,
- chunk.status,
- ),
- )
- # 修改单个 chunk 的分块状态
- async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
- query = """
- UPDATE content_chunks
- SET chunk_status = %s
- WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS),
- )
- # 修改单个 chunk 的 embedding 状态
- async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
- query = """
- UPDATE content_chunks
- SET embedding_status = %s
- WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, chunk_id, ori_status)
- )
- # 设置分块结果,并且将分块状态设置为成功
- async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
- query = """
- UPDATE content_chunks
- SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
- keywords = %s, questions = %s, chunk_status = %s, entities = %s
- WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- chunk.summary,
- chunk.topic,
- chunk.domain,
- chunk.task_type,
- json.dumps(chunk.concepts),
- json.dumps(chunk.keywords),
- json.dumps(chunk.questions),
- new_status,
- json.dumps(chunk.entities),
- chunk.doc_id,
- chunk.chunk_id,
- ori_status,
- ),
- )
- # 修改添加至 es 的状态
- async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
- query = """
- UPDATE content_chunks SET es_status = %s
- WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, chunk_id, ori_status)
- )
- # 修改单个 chunk 的可用状态
- async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
- query = """
- UPDATE content_chunks set status = %s
- WHERE doc_id = %s AND chunk_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, chunk_id, ori_status)
- )
- # 修改单个 doc 的可用状态
- async def update_doc_status(self, doc_id, ori_status, new_status):
- query = """
- UPDATE content_chunks set status = %s
- WHERE doc_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, ori_status)
- )
- # 修改 dataset 的可用状态
- async def update_dataset_status(self, dataset_id, ori_status, new_status):
- query = """
- UPDATE content_chunks set status = %s
- WHERE dataset_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, dataset_id, ori_status)
- )
- # 修改建立图谱状态
- async def update_graph_status(self, doc_id, chunk_id, ori_status, new_status):
- query = """
- UPDATE content_chunks SET graph_status = %s
- WHERE doc_id = %s AND chunk_id = %s AND graph_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, doc_id, chunk_id, ori_status)
- )
- async def select_chunk_content(self, doc_id, chunk_id):
- query = """
- SELECT * FROM content_chunks WHERE doc_id = %s AND chunk_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(doc_id, chunk_id))
- async def select_chunk_contents(
- self,
- page_num: int,
- page_size: int,
- order_by=None,
- doc_id: str = None,
- doc_status: int = None,
- ):
- if order_by is None:
- order_by = {"chunk_id": "asc"}
- offset = (page_num - 1) * page_size
- # 动态拼接 where 条件
- where_clauses = []
- params = []
- if doc_id:
- where_clauses.append("doc_id = %s")
- params.append(doc_id)
- if doc_status:
- where_clauses.append("doc_status = %s")
- params.append(doc_status)
- where_sql = " AND ".join(where_clauses)
- # 动态拼接 order by
- order_field, order_direction = list(order_by.items())[0]
- order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
- # 查询总数
- count_query = (
- f"SELECT COUNT(*) as total_count FROM content_chunks WHERE {where_sql};"
- )
- count_result = await self.pool.async_fetch(
- query=count_query, params=tuple(params)
- )
- total_count = count_result[0]["total_count"] if count_result else 0
- # 查询分页数据
- query = f"""
- SELECT * FROM content_chunks
- WHERE {where_sql}
- {order_sql}
- LIMIT %s OFFSET %s;
- """
- params.extend([page_size, offset])
- entities = await self.pool.async_fetch(query=query, params=tuple(params))
- total_pages = (total_count + page_size - 1) // page_size # 向上取整
- print(total_pages)
- return {
- "entities": entities,
- "total_count": total_count,
- "page": page_num,
- "page_size": page_size,
- "total_pages": total_pages,
- }
|