Skip to content

refactor: connect-first stream lifecycle for sse and streamable_http#2292

Open
maxisbey wants to merge 1 commit intomainfrom
marcelo-async-with-followup
Open

refactor: connect-first stream lifecycle for sse and streamable_http#2292
maxisbey wants to merge 1 commit intomainfrom
marcelo-async-with-followup

Conversation

@maxisbey
Copy link
Contributor

Follow-up to #2266 — applies the websocket_client pattern to the other two client transports.

What changed

Instead of creating memory streams up front and cleaning them up in a try/finally, the transports now:

  1. Connect firsthttpx_client_factory / aconnect_sse / AsyncExitStack become the outer scopes
  2. Create streams only after the connection succeeds — if aconnect_sse() raises ConnectError, no streams were ever created, nothing to clean up
  3. Own all four stream ends + task group in one merged async with as the innermost scope — closes everything in reverse order on exit, task group waits for cancelled tasks before any stream end closes

Drops the try/finally wrapper and four explicit aclose() calls from each file. Net −6 lines.

Why the previous attempt was reverted

Commit history in #2266: 1e83583 tried replacing try: with async with streams: at the same nesting position, leaving httpx_client_factory and aconnect_sse nested inside it. On Python 3.14, that triggered phantom branch arcs on the inner async with lines — each context manager's __aexit__ compiles to a POP_JUMP_IF_TRUE ("did it suppress the exception?") that gets misattributed through 3.14's exception table. d297fcf reverted to try/finally.

This PR inverts the nesting instead. Verified empirically across 3.11/3.13/3.14:

Structure 3.11 3.13 3.14
1 outer async with + multi-CM innermost (websocket, streamable_http)
2 outer async with + multi-CM innermost (sse) 1 phantom on the multi-CM line
same + # pragma: no branch on that line

streamable_http has only one outer layer (AsyncExitStack) → clean on all versions, zero pragmas. sse has two unavoidable outer layers (httpx_client_factory feeds client into aconnect_sse — data dependency, can't merge into one line) → one documented pragma on the async with (streams, tg) line, which is the line we own.

Behavior change

sse_client's ConnectError is no longer wrapped in an ExceptionGroup — the task group is never entered when the connection fails, so there's nothing to group. Updated test_sse_client_closes_all_streams_on_connection_error to expect a bare httpx.ConnectError.

AI Disclaimer

Apply the websocket_client pattern from #2266 to the other two
transports: establish the network connection first, create memory
streams only after it succeeds, then own all four stream ends plus
the task group in a single merged async with as the innermost scope.

This eliminates the try/finally + four explicit aclose() calls.
If the connection fails, no streams were ever created — nothing to
clean up. The multi-CM async with unwinds in reverse order on exit,
so tg.__aexit__ waits for cancelled tasks to finish before any
stream end closes.

streamable_http has one outer async with (the AsyncExitStack for the
conditional httpx client), which is clean on all Python versions.

sse has two unavoidable outer layers (httpx_client_factory feeds into
aconnect_sse — data dependency, can't merge). On 3.14, coverage.py's
static analysis sees a phantom branch on the innermost multi-CM line:
each __aexit__ gets a POP_JUMP_IF_TRUE for 'did it suppress the
exception?', which memory streams never do. One targeted pragma on
the line we own, documented inline.

Behavior change: sse_client's ConnectError is no longer wrapped in
an ExceptionGroup, since the task group is never entered when the
connection fails. Updated the regression test to match.
@maxisbey maxisbey marked this pull request as ready for review March 13, 2026 19:42
Comment on lines +64 to +68
async with aconnect_sse(
client,
"GET",
url,
) as event_source:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with aconnect_sse(
client,
"GET",
url,
) as event_source:
async with aconnect_sse(client, "GET", url) as event_source:

Comment on lines +125 to +126
finally:
await read_stream_writer.aclose()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still neecessary?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants