12345678910 |
- from celery_app import app
- @app.task(bind=True, name='test_task')
- def sample_task(self):
- """示例任务,带任务实例绑定"""
- self.update_state(state='PROGRESS', meta={'status': 'working'})
- print("Task is running...")
- self.update_state(state='PROGRESS', meta={'status': 'finalizing'})
- return "Task Completed!"
|