From error-handling to structured concurrency

⭐⭐⭐⭐⭐ 并发 错误处理 结构化并发 asyncio Go

Author: Nelhage | Source: blog.nelhage.com

核心问题

在单线程程序中,错误沿调用栈向上传播直到被捕获。但在并发程序中,没有单一的调用栈,错误应该"发送"到哪里?

两种常见但都不完美的方案:
  • Python/Java方式:打印错误,终止线程,继续运行 → 程序进入未测试状态,可能死锁
  • Go/Rust方式:打印错误,立即终止整个程序 → 过于激进

asyncio的Task模式

asyncio采用第三种方式:Task是一个对象,可以等待。等待Task会阻塞直到它完成;如果Task抛出异常,异常会被重新传递给等待者。

关键问题:如果没人等待Task,异常会被吞掉,直到程序退出时才打印警告。如果有人等待该Task产生输出,程序会永远静默挂起

TaskLauncher模式

规则:如果spawn一个task,你必须wait它。通过context manager强制形成明确的父子关系:

async with TaskLauncher() as tasks:
    tasks.create_task(background_task())
    tasks.create_task(asyncio.sleep(5))
    # 所有task在退出时都会被wait

两个核心挑战

1. 死锁问题

如果父任务等待子任务完成,但子任务遇到错误,可能导致永远等待。例如:

async def do_work(job_id, done_event):
    if job_id == 1:
        raise ValueError("Oops, job 1 failed!")
    done_event.set()

2. 资源泄漏

失败操作关联的资源可能包括多个执行中的task。需要一种机制来请求任意task提前退出。

核心结论

需要cancellation机制

要同时满足:

  • 等待所有子task完成,确保知道任何额外错误
  • 及时响应子task错误,不无限等待

这基本需要一个cancellation机制来请求其他task提前退出。

现有实现对比

历史教训:pthread_cancel、Thread.stop、Thread.terminate等机制都非常危险或不可用。