from celery_app import app @app.task(bind=True, name='test_task') def sample_task(self): """示例任务,带任务实例绑定""" self.update_state(state='PROGRESS', meta={'status': 'working'}) print("Task is running...") self.update_state(state='PROGRESS', meta={'status': 'finalizing'}) return "Task Completed!"