from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import json
app = FastAPI()
class ChatRequest(BaseModel):
messages: list
stream: bool = True
async def generate_response(messages: list):
"""Simulate LLM token streaming."""
response_text = "Hello! This is a streamed response."
for token in response_text.split():
chunk = {"choices": [{"delta": {"content": token + " "}}]}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.1)
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
return StreamingResponse(
generate_response(request.messages),
media_type="text/event-stream"
)Server-Sent Events (SSE) with Python
Python
LLM
FastAPI
Serving and consuming SSE streams for real-time LLM responses
SSE (Server-Sent Events) is how LLM APIs stream responses token-by-token over HTTP.
Part 1: Serving SSE with FastAPI
Part 2: Consuming SSE with Requests
import json
import requests
def consume_sse(url, headers=None, payload=None):
"""Basic SSE consumer."""
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded = line.decode('utf-8')
if decoded.startswith('data: '):
data = decoded[6:]
if data == '[DONE]':
break
yield json.loads(data)
# Usage with OpenAI-compatible API
def stream_chat(api_url, api_key, messages):
headers = {"Authorization": f"Bearer {api_key}"}
payload = {"messages": messages, "stream": True}
for chunk in consume_sse(api_url, headers, payload):
content = chunk.get('choices', [{}])[0].get('delta', {}).get('content', '')
if content:
print(content, end='', flush=True)Key Points
- Server: Use
StreamingResponsewithmedia_type="text/event-stream" - Client: Use
stream=Trueanditer_lines() - Format:
data: {json}\n\n(double newline separates events) - End signal:
[DONE]indicates stream completion