import asyncio import time from datetime import datetime from applications.api import feishu_robot from applications.utils import task_schedule_response, generate_task_trace_id from applications.tasks.cold_start_tasks import ArticlePoolColdStart from applications.tasks.crawler_tasks import CrawlerToutiao from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask from applications.tasks.llm_tasks import TitleRewrite from applications.tasks.monitor_tasks import check_kimi_balance from applications.tasks.monitor_tasks import GetOffVideos from applications.tasks.monitor_tasks import CheckVideoAuditStatus from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector from applications.tasks.monitor_tasks import TaskProcessingMonitor from applications.tasks.task_mapper import TaskMapper class TaskScheduler(TaskMapper): def __init__(self, data, log_service, db_client): self.data = data self.log_client = log_service self.db_client = db_client self.table = "long_articles_task_manager" self.trace_id = generate_task_trace_id() async def whether_task_processing(self, task_name: str) -> bool: """whether task is processing""" query = f""" select start_timestamp from {self.table} where task_name = %s and task_status = %s; """ response = await self.db_client.async_fetch(query=query, params=(task_name, 1)) if not response: # no task is processing return False else: start_timestamp = response[0]["start_timestamp"] if int(time.time()) - start_timestamp >= self.get_task_config( task_name ).get("expire_duration", self.DEFAULT_TIMEOUT): await feishu_robot.bot( title=f"{task_name} has been processing over timeout", detail={"timestamp": start_timestamp}, env="long_articles_task", ) return True async def record_task(self, task_name, date_string): """record task""" query = f"""insert into {self.table} (date_string, task_name, start_timestamp, trace_id) values (%s, %s, %s, %s);""" await self.db_client.async_save( query=query, params=(date_string, task_name, int(time.time()), self.trace_id), ) async def lock_task(self, task_name, date_string): query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;""" return await self.db_client.async_save( query=query, params=( self.TASK_PROCESSING_STATUS, task_name, date_string, self.TASK_INIT_STATUS, ), ) async def release_task(self, task_name, date_string, final_status=None): """ 任务执行完成之后,将任务状态设置为完成状态/失败状态 """ if not final_status: final_status = self.TASK_SUCCESS_STATUS query = f""" update {self.table} set task_status = %s, finish_timestamp = %s where task_name = %s and date_string = %s and task_status = %s; """ return await self.db_client.async_save( query=query, params=( final_status, int(time.time()), task_name, date_string, self.TASK_PROCESSING_STATUS, ), ) async def deal(self): task_name = self.data.get("task_name") date_string = self.data.get("date_string") if not task_name: await self.log_client.log( contents={ "task": task_name, "function": "task_scheduler_deal", "message": "not task name in params", "status": "fail", "data": self.data, } ) return await task_schedule_response.fail_response( error_code="4002", error_message="task_name must be input" ) if not date_string: date_string = datetime.today().strftime("%Y-%m-%d") # prepare for task if await self.whether_task_processing(task_name): return await task_schedule_response.fail_response( error_code="5001", error_message="task is processing" ) await self.log_client.log( contents={ "trace_id": self.trace_id, "task": task_name, "message": "start processing", "data": self.data, } ) await self.record_task(task_name=task_name, date_string=date_string) await self.lock_task(task_name, date_string) match task_name: case "check_kimi_balance": response = await check_kimi_balance() await self.log_client.log( contents={ "task": task_name, "function": "task_scheduler_deal", "message": "check_kimi_balance task execute successfully", "status": "success", "data": response, } ) await self.release_task( task_name=task_name, date_string=date_string, final_status=response["code"], ) return await task_schedule_response.success_response( task_name=task_name, data=response ) case "get_off_videos": async def background_get_off_videos(): sub_task = GetOffVideos(self.db_client, self.log_client) await sub_task.get_off_job() task_status = await sub_task.check() await self.release_task( task_name=task_name, date_string=date_string, final_status=task_status, ) asyncio.create_task(background_get_off_videos()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "get off_videos started background"}, ) case "check_publish_video_audit_status": async def background_check_publish_video_audit_status(): sub_task = CheckVideoAuditStatus(self.db_client, self.log_client) task_status = await sub_task.deal() await self.release_task( task_name=task_name, date_string=date_string, final_status=task_status, ) print("finish task status: ", task_status) asyncio.create_task(background_check_publish_video_audit_status()) return await task_schedule_response.success_response( task_name=task_name, data={ "code": 0, "message": "check publish video audit status started", }, ) case "outside_article_monitor": async def background_outside_article_monitor(): collect_task = OutsideGzhArticlesCollector(self.db_client) await collect_task.deal() monitor_task = OutsideGzhArticlesMonitor(self.db_client) final_status = await monitor_task.deal() await self.release_task( task_name, date_string, final_status=final_status ) asyncio.create_task(background_outside_article_monitor()) return await task_schedule_response.success_response( task_name=task_name, data={ "code": 0, "message": "outside_article_monitor started background", }, ) case "inner_article_monitor": async def background_inner_article_monitor(): task = InnerGzhArticlesMonitor(self.db_client) final_status = await task.deal() await self.release_task( task_name, date_string, final_status=final_status ) asyncio.create_task(background_inner_article_monitor()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "task started background"}, ) case "title_rewrite": async def background_title_rewrite(): sub_task = TitleRewrite(self.db_client, self.log_client) await sub_task.deal() await self.release_task( task_name=task_name, date_string=date_string ) asyncio.create_task(background_title_rewrite()) return await task_schedule_response.success_response( task_name=task_name, data={ "code": 0, "message": "inner_article_monitor started background", }, ) case "daily_publish_articles_recycle": async def background_daily_publish_articles_recycle(): sub_task = RecycleDailyPublishArticlesTask( self.db_client, self.log_client, date_string ) await sub_task.deal() task = CheckDailyPublishArticlesTask( self.db_client, self.log_client, date_string ) await task.deal() await self.release_task( task_name=task_name, date_string=date_string ) asyncio.create_task(background_daily_publish_articles_recycle()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "task started background"}, ) case "update_root_source_id": async def background_update_root_source_id(): sub_task = UpdateRootSourceIdAndUpdateTimeTask( self.db_client, self.log_client ) await sub_task.deal() await self.release_task( task_name=task_name, date_string=date_string ) asyncio.create_task(background_update_root_source_id()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "task started background"}, ) case "task_processing_monitor": async def background_task_processing_monitor(): sub_task = TaskProcessingMonitor(self.db_client) await sub_task.deal() await self.release_task( task_name=task_name, date_string=date_string ) asyncio.create_task(background_task_processing_monitor()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "task started background"}, ) case "crawler_toutiao_articles": async def background_crawler_toutiao_articles(): sub_task = CrawlerToutiao( self.db_client, self.log_client, self.trace_id ) media_type = self.data.get("media_type", "article") method = self.data.get("method", "account") category_list = self.data.get("category_list", []) match method: case "account": await sub_task.crawler_task(media_type=media_type) case "recommend": await sub_task.crawl_toutiao_recommend_task( category_list=category_list ) await self.release_task( task_name=task_name, date_string=date_string ) asyncio.create_task(background_crawler_toutiao_articles()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "task started background"}, ) case "article_pool_pool_cold_start": async def background_article_pool_pool_cold_start(): sub_task = ArticlePoolColdStart( self.db_client, self.log_client, self.trace_id ) crawler_methods = self.data.get("crawler_methods", []) platform = self.data.get("platform", "weixin") await sub_task.deal( platform=platform, crawl_methods=crawler_methods ) await self.release_task( task_name=task_name, date_string=date_string ) await self.log_client.log( contents={ "trace_id": self.trace_id, "task": task_name, "message": "finish processed", "data": self.data, } ) asyncio.create_task(background_article_pool_pool_cold_start()) return await task_schedule_response.success_response( task_name=task_name, data={"code": 0, "message": "task started background"}, ) case _: await self.log_client.log( contents={ "task": task_name, "function": "task_scheduler_deal", "message": "wrong task input", "status": "success", "data": self.data, } ) await self.release_task(task_name, date_string, self.TASK_FAILED_STATUS) return await task_schedule_response.fail_response( error_code="4001", error_message="wrong task name input" )