from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator
app = FastAPI()
async def generate_stream() -> AsyncGenerator[str, None]:
"""Simulate streaming data (e.g., LLM tokens)."""
chunks = ["Hello ", "world! ", "This ", "is ", "streaming ", "data."]
for chunk in chunks:
await asyncio.sleep(0.3) # Simulate delay
yield chunk
@app.get("/stream")
async def stream_endpoint():
return StreamingResponse(
generate_stream(),
media_type="text/plain"
)FastAPI + Long Polling for Streaming
Engineering
Python
A simple pattern for streaming responses using FastAPI and long polling
Why Long Polling?
When you need to stream data from a backend to a frontend but can’t use WebSockets (e.g., certain deployment constraints, simpler client implementation), long polling is a reliable alternative.
The pattern: 1. Client makes a request 2. Server holds the connection open until data is available 3. Server sends response, client immediately reconnects 4. Repeat until streaming is complete
Server Implementation
Long Polling with Session State
For more complex scenarios where you need to track progress across requests:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uuid
import asyncio
from collections import defaultdict
app = FastAPI()
# In-memory session storage (use Redis in production)
sessions: dict[str, dict] = {}
class StartResponse(BaseModel):
session_id: str
class PollResponse(BaseModel):
data: str
done: bool
@app.post("/start")
async def start_stream() -> StartResponse:
"""Initialize a streaming session."""
session_id = str(uuid.uuid4())
sessions[session_id] = {
"queue": asyncio.Queue(),
"done": False
}
# Start background task to produce data
asyncio.create_task(produce_data(session_id))
return StartResponse(session_id=session_id)
async def produce_data(session_id: str):
"""Background task that produces streaming data."""
chunks = ["Processing ", "your ", "request ", "now..."]
for chunk in chunks:
await asyncio.sleep(0.5)
await sessions[session_id]["queue"].put(chunk)
sessions[session_id]["done"] = True
@app.get("/poll/{session_id}")
async def poll(session_id: str) -> PollResponse:
"""Long poll for next chunk of data."""
if session_id not in sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = sessions[session_id]
try:
# Wait up to 30s for data
data = await asyncio.wait_for(
session["queue"].get(),
timeout=30.0
)
return PollResponse(data=data, done=False)
except asyncio.TimeoutError:
# Return empty response, client should retry
return PollResponse(data="", done=session["done"])Client Implementation (Python)
import httpxasync def stream_with_long_polling(base_url: str = "http://localhost:8000"): async with httpx.AsyncClient() as client: # Start session response = await client.post(f"{base_url}/start") session_id = response.json()["session_id"] # Poll until done result_text = "" done = False while not done: response = await client.get( f"{base_url}/poll/{session_id}", timeout=35.0 # Slightly longer than server timeout ) result = response.json() if result["data"]: print(result["data"], end="", flush=True) result_text += result["data"] done = result["done"] print("\nStream complete!") return result_text# Usage: await stream_with_long_polling()Key Considerations
| Aspect | Recommendation |
|---|---|
| Timeout | Set reasonable poll timeout (30s typical) |
| Session cleanup | Add TTL-based cleanup for abandoned sessions |
| Production storage | Use Redis instead of in-memory dict |
| Error handling | Client should retry on network errors |
| Scaling | Session affinity or shared state needed for multiple workers |
When to Use What
- SSE (Server-Sent Events): Best for one-way server-to-client streaming
- WebSockets: Bidirectional, real-time communication
- Long Polling: When SSE/WebSockets aren’t available, or for simpler deployments