|
@@ -5,6 +5,7 @@ from datetime import datetime
|
|
|
from applications.api import feishu_robot
|
|
from applications.api import feishu_robot
|
|
|
from applications.utils import task_schedule_response
|
|
from applications.utils import task_schedule_response
|
|
|
|
|
|
|
|
|
|
+from applications.tasks.crawler_tasks import CrawlerToutiao
|
|
|
from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
|
|
from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
|
|
|
from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
|
|
from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
|
|
|
from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
|
|
from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
|
|
@@ -31,9 +32,7 @@ class TaskScheduler(TaskMapper):
|
|
|
query = f"""
|
|
query = f"""
|
|
|
select start_timestamp from {self.table} where task_name = %s and task_status = %s;
|
|
select start_timestamp from {self.table} where task_name = %s and task_status = %s;
|
|
|
"""
|
|
"""
|
|
|
- response, error = await self.db_client.async_fetch(
|
|
|
|
|
- query=query, params=(task_name, 1)
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
|
|
|
if not response:
|
|
if not response:
|
|
|
# no task is processing
|
|
# no task is processing
|
|
|
return False
|
|
return False
|
|
@@ -60,21 +59,35 @@ class TaskScheduler(TaskMapper):
|
|
|
async def lock_task(self, task_name, date_string):
|
|
async def lock_task(self, task_name, date_string):
|
|
|
query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
|
|
query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
|
|
|
return await self.db_client.async_save(
|
|
return await self.db_client.async_save(
|
|
|
- query=query, params=(1, task_name, date_string, 0)
|
|
|
|
|
|
|
+ query=query,
|
|
|
|
|
+ params=(
|
|
|
|
|
+ self.TASK_PROCESSING_STATUS,
|
|
|
|
|
+ task_name,
|
|
|
|
|
+ date_string,
|
|
|
|
|
+ self.TASK_INIT_STATUS,
|
|
|
|
|
+ ),
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- async def release_task(self, task_name, date_string, final_status):
|
|
|
|
|
|
|
+ async def release_task(self, task_name, date_string, final_status=None):
|
|
|
"""
|
|
"""
|
|
|
任务执行完成之后,将任务状态设置为完成状态/失败状态
|
|
任务执行完成之后,将任务状态设置为完成状态/失败状态
|
|
|
"""
|
|
"""
|
|
|
|
|
+ if not final_status:
|
|
|
|
|
+ final_status = self.TASK_SUCCESS_STATUS
|
|
|
query = f"""
|
|
query = f"""
|
|
|
- update {self.table}
|
|
|
|
|
- set task_status = %s, finish_timestamp = %s
|
|
|
|
|
- where task_name = %s and date_string = %s and task_status = %s;
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ update {self.table}
|
|
|
|
|
+ set task_status = %s, finish_timestamp = %s
|
|
|
|
|
+ where task_name = %s and date_string = %s and task_status = %s;
|
|
|
|
|
+ """
|
|
|
return await self.db_client.async_save(
|
|
return await self.db_client.async_save(
|
|
|
query=query,
|
|
query=query,
|
|
|
- params=(final_status, int(time.time()), task_name, date_string, 1),
|
|
|
|
|
|
|
+ params=(
|
|
|
|
|
+ final_status,
|
|
|
|
|
+ int(time.time()),
|
|
|
|
|
+ task_name,
|
|
|
|
|
+ date_string,
|
|
|
|
|
+ self.TASK_PROCESSING_STATUS,
|
|
|
|
|
+ ),
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
async def deal(self):
|
|
async def deal(self):
|
|
@@ -209,7 +222,7 @@ class TaskScheduler(TaskMapper):
|
|
|
sub_task = TitleRewrite(self.db_client, self.log_client)
|
|
sub_task = TitleRewrite(self.db_client, self.log_client)
|
|
|
await sub_task.deal()
|
|
await sub_task.deal()
|
|
|
await self.release_task(
|
|
await self.release_task(
|
|
|
- task_name=task_name, date_string=date_string, final_status=2
|
|
|
|
|
|
|
+ task_name=task_name, date_string=date_string
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
asyncio.create_task(background_title_rewrite())
|
|
asyncio.create_task(background_title_rewrite())
|
|
@@ -228,14 +241,12 @@ class TaskScheduler(TaskMapper):
|
|
|
self.db_client, self.log_client, date_string
|
|
self.db_client, self.log_client, date_string
|
|
|
)
|
|
)
|
|
|
await sub_task.deal()
|
|
await sub_task.deal()
|
|
|
-
|
|
|
|
|
task = CheckDailyPublishArticlesTask(
|
|
task = CheckDailyPublishArticlesTask(
|
|
|
self.db_client, self.log_client, date_string
|
|
self.db_client, self.log_client, date_string
|
|
|
)
|
|
)
|
|
|
await task.deal()
|
|
await task.deal()
|
|
|
-
|
|
|
|
|
await self.release_task(
|
|
await self.release_task(
|
|
|
- task_name=task_name, date_string=date_string, final_status=2
|
|
|
|
|
|
|
+ task_name=task_name, date_string=date_string
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
asyncio.create_task(background_daily_publish_articles_recycle())
|
|
asyncio.create_task(background_daily_publish_articles_recycle())
|
|
@@ -252,7 +263,7 @@ class TaskScheduler(TaskMapper):
|
|
|
)
|
|
)
|
|
|
await sub_task.deal()
|
|
await sub_task.deal()
|
|
|
await self.release_task(
|
|
await self.release_task(
|
|
|
- task_name=task_name, date_string=date_string, final_status=2
|
|
|
|
|
|
|
+ task_name=task_name, date_string=date_string
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
asyncio.create_task(background_update_root_source_id())
|
|
asyncio.create_task(background_update_root_source_id())
|
|
@@ -262,10 +273,13 @@ class TaskScheduler(TaskMapper):
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
case "task_processing_monitor":
|
|
case "task_processing_monitor":
|
|
|
|
|
+
|
|
|
async def background_task_processing_monitor():
|
|
async def background_task_processing_monitor():
|
|
|
sub_task = TaskProcessingMonitor(self.db_client)
|
|
sub_task = TaskProcessingMonitor(self.db_client)
|
|
|
await sub_task.deal()
|
|
await sub_task.deal()
|
|
|
- await self.release_task(task_name=task_name, date_string=date_string, final_status=2)
|
|
|
|
|
|
|
+ await self.release_task(
|
|
|
|
|
+ task_name=task_name, date_string=date_string
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
asyncio.create_task(background_task_processing_monitor())
|
|
asyncio.create_task(background_task_processing_monitor())
|
|
|
return await task_schedule_response.success_response(
|
|
return await task_schedule_response.success_response(
|
|
@@ -273,6 +287,23 @@ class TaskScheduler(TaskMapper):
|
|
|
data={"code": 0, "message": "task started background"},
|
|
data={"code": 0, "message": "task started background"},
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+ case "crawler_toutiao_articles":
|
|
|
|
|
+
|
|
|
|
|
+ async def background_crawler_toutiao_articles():
|
|
|
|
|
+ sub_task = CrawlerToutiao(self.db_client, self.log_client)
|
|
|
|
|
+ await sub_task.crawler_task(
|
|
|
|
|
+ media_type=self.data.get("media_type", "article")
|
|
|
|
|
+ )
|
|
|
|
|
+ await self.release_task(
|
|
|
|
|
+ task_name=task_name, date_string=date_string
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ asyncio.create_task(background_crawler_toutiao_articles())
|
|
|
|
|
+ return await task_schedule_response.success_response(
|
|
|
|
|
+ task_name=task_name,
|
|
|
|
|
+ data={"code": 0, "message": "task started background"},
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
case _:
|
|
case _:
|
|
|
await self.log_client.log(
|
|
await self.log_client.log(
|
|
|
contents={
|
|
contents={
|
|
@@ -283,7 +314,7 @@ class TaskScheduler(TaskMapper):
|
|
|
"data": self.data,
|
|
"data": self.data,
|
|
|
}
|
|
}
|
|
|
)
|
|
)
|
|
|
- await self.release_task(task_name, date_string, 99)
|
|
|
|
|
|
|
+ await self.release_task(task_name, date_string, self.TASK_FAILED_STATUS)
|
|
|
return await task_schedule_response.fail_response(
|
|
return await task_schedule_response.fail_response(
|
|
|
error_code="4001", error_message="wrong task name input"
|
|
error_code="4001", error_message="wrong task name input"
|
|
|
)
|
|
)
|