| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- from typing import List, Dict
- from app.core.database import DatabaseManager
- from ._const import DecodeTaskConst
- class ArticlesDecodeTaskMapper(DecodeTaskConst):
- DECODE_TASK_QUEUE = "long_articles_decode_tasks"
- INNER_DECODE_CREATE_STATE = "long_articles_inner_decode_create_state"
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- # 存储解构任务
- async def record_decode_task(
- self,
- task_id: str,
- content_id: str,
- task_type: int,
- payload: str,
- remark: str = None,
- ) -> int:
- query = f"""
- INSERT INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark)
- VALUES (%s, %s, %s, %s, %s)
- """
- return await self.pool.async_save(
- query=query, params=(task_id, content_id, task_type, payload, remark)
- )
- async def record_decode_task_if_absent(
- self,
- task_id: str,
- content_id: str,
- task_type: int,
- payload: str,
- remark: str = None,
- ) -> int:
- query = f"""
- INSERT IGNORE INTO {self.DECODE_TASK_QUEUE} (task_id, content_id, task_type, payload, remark)
- VALUES (%s, %s, %s, %s, %s)
- """
- return await self.pool.async_save(
- query=query, params=(task_id, content_id, task_type, payload, remark)
- )
- # 更新解构任务状态
- async def update_decode_task_status(
- self, task_id: str, ori_status: int, new_status: int, remark: str = None
- ) -> int:
- query = f"""
- UPDATE {self.DECODE_TASK_QUEUE}
- SET status = %s, remark = %s
- WHERE task_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, remark, task_id, ori_status)
- )
- # 设置解构结果
- async def set_decode_result(
- self, task_id: str, result: str, remark: str = None
- ) -> int:
- query = f"""
- UPDATE {self.DECODE_TASK_QUEUE}
- SET status = %s, remark = %s, result = %s
- WHERE task_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.SUCCESS,
- remark,
- result,
- task_id,
- self.TaskStatus.PROCESSING,
- ),
- )
- # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
- async def fetch_decoding_tasks(self) -> List[Dict]:
- query = f"""
- SELECT task_id FROM {self.DECODE_TASK_QUEUE} WHERE status = %s LIMIT %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
- )
- # 获取待解析的任务(获取处理成功的任务)
- async def fetch_extract_tasks(self):
- query = f"""
- SELECT id, result FROM {self.DECODE_TASK_QUEUE}
- WHERE extract_status = %s AND status = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS)
- )
- # 修改解析状态(用于加锁与状态流转)
- async def update_extract_status(self, task_id, ori_status, new_status):
- query = f"""
- UPDATE {self.DECODE_TASK_QUEUE}
- SET extract_status = %s WHERE extract_status = %s AND id = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- new_status,
- ori_status,
- task_id,
- ),
- )
- # 记录解析结果明细到 long_articles_decode_task_detail
- async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
- query = """
- INSERT INTO long_articles_decode_task_detail
- (decode_task_id, inspiration, purpose, key_point, topic)
- VALUES (%s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=(
- decode_task_id,
- detail.get("inspiration", ""),
- detail.get("purpose", ""),
- detail.get("key_point", ""),
- detail.get("topic", ""),
- ),
- )
- # 判断是否存在相同的任务 id
- async def fetch_exist_source_id(self, source_id, task_type):
- query = f"""
- SELECT id FROM {self.DECODE_TASK_QUEUE}
- WHERE content_id = %s AND task_type = %s;
- """
- return await self.pool.async_fetch(query=query, params=(source_id, task_type))
- class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
- def __init__(self, pool: DatabaseManager):
- super().__init__(pool)
- async def record_decode_task(
- self, task_id: str, wx_sn: str, remark: str = None
- ) -> int:
- return await super().record_decode_task(
- task_id=task_id,
- content_id=wx_sn,
- task_type=self.TaskType.SOURCE_IMAGES_TEXT,
- payload=self.AdPlatformDecodeTask.EMPTY_PAYLOAD_JSON,
- remark=remark,
- )
- # 修改文章解构状态
- async def update_article_decode_status(
- self, id_: int, ori_status: int, new_status: int
- ) -> int:
- query = """
- UPDATE ad_platform_accounts_daily_detail
- SET decode_status = %s
- WHERE id = %s AND decode_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, id_, ori_status)
- )
- # 获取待解构文章
- async def fetch_decode_articles(self) -> List[Dict]:
- query = """
- SELECT id, account_name, gh_id, article_title, article_cover,
- article_text, article_images, wx_sn
- FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s
- LIMIT %s;
- """
- return await self.pool.async_fetch(
- query=query,
- params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
- )
- class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
- def __init__(self, pool: DatabaseManager):
- super().__init__(pool)
- # 获取内部文章
- async def fetch_inner_articles(self, date_string=None):
- if date_string is None:
- date_string = self.InnerDecodeCreate.DEFAULT_GOOD_READ_DATE
- query = """
- SELECT title, source_id, wx_sn, cover_img_url FROM long_articles_good_read_article WHERE dt = %s
- ORDER by max_read_rate DESC
- ;
- """
- return await self.pool.async_fetch(query=query, params=(date_string,))
- # 获取内部文章生成信息
- async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
- mod_types = (
- self.ProduceModuleType.COVER,
- self.ProduceModuleType.IMAGE,
- self.ProduceModuleType.TITLE,
- self.ProduceModuleType.CONTENT,
- self.ProduceModuleType.SUMMARY,
- )
- placeholders = ",".join(["%s"] * len(mod_types))
- query = f"""
- SELECT produce_module_type, output
- FROM produce_plan_module_output WHERE plan_exe_id = %s
- AND produce_module_type in ({placeholders});
- """
- return await self.pool.async_fetch(
- query=query, db_name="aigc", params=(source_id, *mod_types)
- )
- # 获取文章源信息
- async def fetch_article_crawler_source_info(self, source_id: str):
- query = """
- SELECT
- t2.channel_content_id, t3.body_text
- FROM produce_plan_exe_record t1
- LEFT JOIN produce_plan_exe_refer_content t2 ON t2.plan_exe_id = t1.plan_exe_id
- LEFT JOIN crawler_content_blob t3 ON t3.channel_content_id = t2.channel_content_id
- WHERE
- t1.plan_exe_id = %s;
- """
- return await self.pool.async_fetch(
- query=query, db_name="aigc", params=(source_id,)
- )
- async def init_create_state(self, source_id: str, task_type: int, now_ts: int):
- query = f"""
- INSERT IGNORE INTO {self.INNER_DECODE_CREATE_STATE}
- (source_id, task_type, status, retry_count, locked_at, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=(
- source_id,
- task_type,
- self.TaskStatus.INIT,
- self.InnerCreateState.INITIAL_RETRY_COUNT,
- self.InnerCreateState.INITIAL_LOCKED_AT,
- now_ts,
- now_ts,
- ),
- )
- async def fetch_create_state(self, source_id: str, task_type: int):
- query = f"""
- SELECT source_id, task_type, status, retry_count, locked_at, remote_task_id, last_error
- FROM {self.INNER_DECODE_CREATE_STATE}
- WHERE source_id = %s AND task_type = %s
- LIMIT %s;
- """
- rows = await self.pool.async_fetch(
- query=query,
- params=(source_id, task_type, self.InnerCreateState.FETCH_STATE_ROW_LIMIT),
- )
- if not rows:
- return None
- return rows[0]
- async def acquire_create_lock(
- self,
- source_id: str,
- task_type: int,
- now_ts: int,
- max_retry_times: int,
- lock_expire_before: int,
- ):
- query = f"""
- UPDATE {self.INNER_DECODE_CREATE_STATE}
- SET status = %s, locked_at = %s, updated_at = %s, last_error = NULL
- WHERE source_id = %s
- AND task_type = %s
- AND (
- status = %s
- OR (status = %s AND retry_count < %s)
- OR (status = %s AND locked_at > %s AND locked_at < %s)
- );
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.PROCESSING,
- now_ts,
- now_ts,
- source_id,
- task_type,
- self.TaskStatus.INIT,
- self.TaskStatus.FAILED,
- max_retry_times,
- self.TaskStatus.PROCESSING,
- self.InnerCreateState.LOCKED_AT_CLEARED,
- lock_expire_before,
- ),
- )
- async def mark_create_success(
- self,
- source_id: str,
- task_type: int,
- remote_task_id: str,
- now_ts: int,
- remark: str = None,
- ):
- query = f"""
- UPDATE {self.INNER_DECODE_CREATE_STATE}
- SET status = %s,
- remote_task_id = %s,
- last_error = %s,
- locked_at = %s,
- updated_at = %s
- WHERE source_id = %s AND task_type = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.SUCCESS,
- remote_task_id,
- remark,
- self.InnerCreateState.LOCKED_AT_CLEARED,
- now_ts,
- source_id,
- task_type,
- self.TaskStatus.PROCESSING,
- ),
- )
- async def mark_create_retry(
- self,
- source_id: str,
- task_type: int,
- now_ts: int,
- error_message: str,
- ):
- query = f"""
- UPDATE {self.INNER_DECODE_CREATE_STATE}
- SET status = %s,
- retry_count = retry_count + 1,
- last_error = %s,
- locked_at = %s,
- updated_at = %s
- WHERE source_id = %s AND task_type = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.INIT,
- error_message,
- self.InnerCreateState.LOCKED_AT_CLEARED,
- now_ts,
- source_id,
- task_type,
- self.TaskStatus.PROCESSING,
- ),
- )
- async def mark_create_failed(
- self,
- source_id: str,
- task_type: int,
- now_ts: int,
- error_message: str,
- ):
- query = f"""
- UPDATE {self.INNER_DECODE_CREATE_STATE}
- SET status = %s,
- retry_count = retry_count + 1,
- last_error = %s,
- locked_at = %s,
- updated_at = %s
- WHERE source_id = %s AND task_type = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.FAILED,
- error_message,
- self.InnerCreateState.LOCKED_AT_CLEARED,
- now_ts,
- source_id,
- task_type,
- self.TaskStatus.PROCESSING,
- ),
- )
- __all__ = [
- "ArticlesDecodeTaskMapper",
- "AdPlatformArticlesDecodeTaskMapper",
- "InnerArticlesDecodeTaskMapper",
- ]
|