celery_tasks.py 338 B

12345678910
  1. from celery_app import app
  2. @app.task(bind=True, name='test_task')
  3. def sample_task(self):
  4. """示例任务,带任务实例绑定"""
  5. self.update_state(state='PROGRESS', meta={'status': 'working'})
  6. print("Task is running...")
  7. self.update_state(state='PROGRESS', meta={'status': 'finalizing'})
  8. return "Task Completed!"