The Lie You’ve Been Living
With special thanks to Georgiy Treyvus, whose idea made this book happen.
You’ve been writing Python web applications for — let’s say — a while. You know how to define routes. You know how to write views. You know that request.method gives you "GET" or "POST", and you know that returning a Response object makes things appear in someone’s browser.
What you may not know is what any of that actually is.
Here is the thing that your framework would prefer you not examine too closely: it’s a function. The entire web application you’ve been building — the routing, the middleware, the template rendering, the session handling, the authentication system, the REST API — all of it ultimately compiles down to a Python callable that takes some arguments and returns something.
That’s it. That’s the whole trick.
Let’s Prove It Right Now
Here is a complete, functional web application that will run in production:
def application(environ, start_response):
status = "200 OK"
headers = [("Content-Type", "text/plain")]
start_response(status, headers)
return [b"Hello, world"]
Save this as app.py. Install gunicorn (pip install gunicorn). Run:
gunicorn app:application
You now have a production web server serving HTTP requests. No framework. No dependencies beyond gunicorn. No magic.
If you point your browser at http://localhost:8000, you’ll see “Hello, world”.
That function — application — is a WSGI application. Everything Django and Flask have ever done starts from exactly this interface.
The Moment of Recognition
Now look at Django. From django/core/handlers/wsgi.py:
class WSGIHandler(base.BaseHandler):
request_class = WSGIRequest
def __call__(self, environ, start_response):
set_script_prefix(get_script_name(environ))
signals.request_started.send(sender=self.__class__, environ=environ)
request = self.request_class(environ)
response = self.get_response(request)
# ... headers, status ...
start_response(status, response_headers)
# ...
return response
It’s __call__. Django’s entire web framework is a class with a __call__ method that takes environ and start_response. It is, by definition, a callable that implements the WSGI interface.
Every piece of Django — the ORM, the admin, the URL dispatcher, the template engine — exists to produce that callable. The callable is the product.
Flask? Same thing:
class Flask(App):
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
FastAPI runs on top of Starlette, which is ASGI (we’ll get to that). But strip it down and you find the same idea: a callable with a defined interface.
Why This Matters
If you understand that your framework is a callable with a specific signature, several things become clear:
Testing becomes obvious. Your app is a function. Call it with the right arguments and inspect the result. No magic test client needed — though those are useful too.
Middleware makes sense. Middleware is a callable that takes a callable and returns a callable. It’s function composition. It’s wrappers. Once you see this, the middleware stack is just a chain of decorators with extra steps.
The framework is not special. It’s solving real problems — routing, request parsing, response serialization — but it’s doing so with the same Python you write every day. There’s no privileged access, no hidden C extensions doing the real work (well, sometimes there are C extensions, but not for routing). It’s just code.
Debugging gets easier. When something goes wrong at the framework level, you now have a mental model of where to look. The request came in. It hit the WSGI callable. Something happened between environ and start_response. You can trace it.
The Interfaces, Briefly
There are two specs we care about in this book.
WSGI (Web Server Gateway Interface, PEP 3333) is the synchronous interface. It’s been around since 2003. Every line of Python web code written before async became mainstream runs on top of it. The entire spec is essentially:
application(environ, start_response) -> iterable of bytes
ASGI (Asynchronous Server Gateway Interface) is the async successor. It was designed to handle things WSGI can’t — WebSockets, long-polling, HTTP/2 push — by making the entire interface async. The spec is:
application(scope, receive, send) -> None # but async
Both specs define a contract between a web server (Gunicorn, Uvicorn, Hypercorn) and a web application (your code, or Django, or FastAPI). The server handles the TCP connection, parses the HTTP request, and calls your callable. Your callable decides what to return. The server sends it back.
The framework just makes it easier to write that callable. That’s the whole job.
What This Book Will Do
We’re going to start at the bottom.
In Part I, we’ll look at what HTTP actually is (text over a socket), what the frameworks are doing, and why understanding this matters for your day-to-day work.
In Part II, we’ll implement WSGI from first principles: a server, middleware, routing, and request/response abstractions — all from scratch.
In Part III, we’ll do the same for ASGI: the async model, WebSockets, lifespan events, and building an async server.
In Part IV, we’ll look at patterns — testing, middleware composition, and building a small framework — to solidify everything.
By the end, you’ll be able to read the Gunicorn source code and understand what it’s doing. You’ll know what Uvicorn’s main() actually does. You’ll be able to debug framework-level issues because you’ll have written the framework-level code yourself.
More importantly, you’ll look at your next Django application and see it for what it is: a callable. A very sophisticated, well-tested, production-hardened callable — but a callable nonetheless.
The magic was just Python with good variable names.
Let’s start with the protocol.
HTTP Is Just Text
Before we talk about WSGI or ASGI, we need to talk about what they’re abstracting over. And what they’re abstracting over is HTTP. And HTTP is just text.
This is not a simplification. Open a TCP connection to port 80 of any web server in the world, type the right bytes, and you’ll get an HTTP response. You don’t need a library. You need a socket and the knowledge of what to type.
Let’s actually do it.
Talking to a Web Server with a Raw Socket
import socket
def raw_http_request(host: str, path: str = "/") -> str:
"""Make an HTTP/1.1 GET request using nothing but a socket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, 80))
# This is a complete, valid HTTP/1.1 request.
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
sock.sendall(request.encode("utf-8"))
# Read the response in chunks
response = b""
while chunk := sock.recv(4096):
response += chunk
sock.close()
return response.decode("utf-8", errors="replace")
if __name__ == "__main__":
response = raw_http_request("example.com")
print(response[:500]) # Just the beginning
Run this and you’ll see something like:
HTTP/1.1 200 OK
Content-Encoding: gzip
Accept-Ranges: bytes
Age: 123456
Cache-Control: max-age=604800
Content-Type: text/html; charset=UTF-8
Date: Thu, 01 Jan 2026 00:00:00 GMT
...
<!doctype html>
<html>
...
That’s it. That’s HTTP. A text request, a text response. The format is specified in RFC 9110 (and historically RFC 2616, RFC 7230, etc.) but the format itself is not complicated.
The Structure of an HTTP Request
An HTTP request has this shape:
METHOD /path HTTP/version\r\n
Header-Name: header-value\r\n
Another-Header: another-value\r\n
\r\n
[optional body]
A minimal GET request:
GET /index.html HTTP/1.1\r\n
Host: example.com\r\n
\r\n
A POST request with a body:
POST /api/users HTTP/1.1\r\n
Host: api.example.com\r\n
Content-Type: application/json\r\n
Content-Length: 27\r\n
\r\n
{"name": "Alice", "age": 30}
Three parts: the request line, the headers, and the body. Each header is on its own line. Headers are separated from the body by a blank line (\r\n\r\n). The \r\n is a carriage return followed by a newline — HTTP requires both, not just \n.
The Structure of an HTTP Response
An HTTP response:
HTTP/version STATUS_CODE Reason Phrase\r\n
Header-Name: header-value\r\n
Another-Header: another-value\r\n
\r\n
[body]
A minimal response:
HTTP/1.1 200 OK\r\n
Content-Type: text/plain\r\n
Content-Length: 13\r\n
\r\n
Hello, world!
The status line, headers, blank line, body. Same structure, mirrored.
Parsing HTTP Requests
Let’s write a basic HTTP request parser. Not to use in production — for understanding what Gunicorn and Uvicorn do on every single request before they ever touch your application code.
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class HTTPRequest:
method: str
path: str
query_string: str
http_version: str
headers: Dict[str, str]
body: bytes
@property
def content_type(self) -> Optional[str]:
return self.headers.get("content-type")
@property
def content_length(self) -> int:
return int(self.headers.get("content-length", 0))
def parse_request(raw: bytes) -> HTTPRequest:
"""
Parse a raw HTTP request into an HTTPRequest object.
Handles the header/body split and basic header parsing.
"""
# Split headers from body at the blank line
header_section, _, body = raw.partition(b"\r\n\r\n")
# Split header section into individual lines
lines = header_section.decode("utf-8", errors="replace").split("\r\n")
# First line is the request line
request_line = lines[0]
method, raw_path, http_version = request_line.split(" ", 2)
# Split path from query string
if "?" in raw_path:
path, query_string = raw_path.split("?", 1)
else:
path, query_string = raw_path, ""
# Parse headers (everything after the request line)
headers = {}
for line in lines[1:]:
if ": " in line:
name, _, value = line.partition(": ")
headers[name.lower()] = value
return HTTPRequest(
method=method,
path=path,
query_string=query_string,
http_version=http_version,
headers=headers,
body=body,
)
# Test it
raw_request = (
b"POST /api/users?active=true HTTP/1.1\r\n"
b"Host: localhost\r\n"
b"Content-Type: application/json\r\n"
b"Content-Length: 27\r\n"
b"\r\n"
b'{"name": "Alice", "age": 30}'
)
req = parse_request(raw_request)
print(f"Method: {req.method}")
print(f"Path: {req.path}")
print(f"Query: {req.query_string}")
print(f"Content-Type: {req.content_type}")
print(f"Body: {req.body}")
Output:
Method: POST
Path: /api/users
Query: active=true
Content-Type: application/json
Body: b'{"name": "Alice", "age": 30}'
This is, roughly, what every web server does before handing control to your application. Gunicorn’s HTTP parser is more robust (it handles edge cases, malformed requests, chunked transfer encoding, etc.), but conceptually it’s doing exactly this.
Building an HTTP Response
The other direction: given what you want to send back, construct valid HTTP bytes.
def build_response(
status_code: int,
reason: str,
headers: Dict[str, str],
body: bytes,
) -> bytes:
"""Build a raw HTTP/1.1 response."""
status_line = f"HTTP/1.1 {status_code} {reason}\r\n"
# Always include Content-Length
headers["Content-Length"] = str(len(body))
header_lines = "".join(
f"{name}: {value}\r\n"
for name, value in headers.items()
)
return (
status_line.encode("utf-8")
+ header_lines.encode("utf-8")
+ b"\r\n"
+ body
)
response = build_response(
200,
"OK",
{"Content-Type": "text/plain"},
b"Hello, world!",
)
print(response.decode("utf-8"))
Output:
HTTP/1.1 200 OK
Content-Type: text/plain
Content-Length: 13
Hello, world!
What WSGI Does With All This
Now think about this: when a request comes in to a Gunicorn worker, the worker:
- Reads bytes from the socket
- Parses them into method, path, headers, body (as above)
- Packs all of that into a dictionary called
environ - Calls your WSGI application with
environandstart_response - Takes whatever your application returns and writes it back to the socket as HTTP response bytes
The environ dictionary is just a structured version of the parsed HTTP request. REQUEST_METHOD is the method. PATH_INFO is the path. HTTP_CONTENT_TYPE is the Content-Type header. wsgi.input is a file-like object wrapping the body bytes.
When you call start_response("200 OK", [("Content-Type", "text/plain")]) in your WSGI app, you’re providing the status line and headers that the server will write back. When you return [b"Hello, world!"], you’re providing the response body.
The server just… sends it.
[raw bytes in] -> [parse] -> [your callable] -> [serialize] -> [raw bytes out]
That’s the entire pipeline. WSGI is just the contract for the middle part.
Keepalive, Chunked Encoding, and Things We’re Ignoring
Real HTTP has some complexity we’ve glossed over:
Connection: keep-alive — HTTP/1.1 defaults to keeping the connection open for multiple requests. The server needs to know when one request ends and the next begins, which it does via Content-Length or chunked transfer encoding.
Chunked transfer encoding — instead of specifying Content-Length upfront, you can stream the response in chunks, each prefixed with its size in hex. This is how streaming responses work.
HTTP/2 — multiplexed streams over a single connection, binary framing, header compression. Same semantics, very different wire format.
TLS — everything above happens over an encrypted connection. Same protocol, but the bytes going over the wire are ciphertext.
WSGI abstracts all of this. You don’t handle keep-alive or chunked encoding directly. The server does. You write your callable; the server handles the transport.
ASGI handles more of these edge cases natively — particularly streaming — which is part of why it exists. We’ll get there.
The Thing to Hold Onto
HTTP is a text protocol. Requests are lines of text: a request line, headers, body. Responses are lines of text: a status line, headers, body. The blank line between headers and body is significant. The \r\n line endings are required.
Everything your framework does is ultimately:
- Parse the incoming text into a convenient Python object
- Call your handler function
- Serialize the result back into text
That’s the whole transaction. Hold that mental model as we build the WSGI layer on top of it.
What Django and FastAPI Are Actually Doing
We’ve established that HTTP is text and that WSGI is a callable interface. Now let’s look at what Django and FastAPI actually do with that interface — because once you see it, the framework becomes a much less mysterious box.
We’ll trace a request through each one, following the actual code path (simplified to keep it readable). The goal is not to understand every detail of Django’s internals — the Django team has written excellent documentation for that. The goal is to see the skeleton: the WSGI entry point, the routing, and the response serialization.
Django’s Request Path
A Django project has a WSGI entrypoint file, generated by startproject:
# myproject/wsgi.py
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = get_wsgi_application()
application here is what Gunicorn will call. Let’s follow get_wsgi_application():
# django/core/wsgi.py
def get_wsgi_application():
django.setup()
return WSGIHandler()
It runs Django setup (loads settings, connects signals, initializes apps) and returns a WSGIHandler. Let’s look at WSGIHandler:
# django/core/handlers/wsgi.py (simplified)
class WSGIHandler(base.BaseHandler):
request_class = WSGIRequest
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.load_middleware() # Build the middleware stack
def __call__(self, environ, start_response):
# Convert environ to a Django request object
request = self.request_class(environ)
# Run the full middleware/view pipeline
response = self.get_response(request)
# Django response -> HTTP status string and headers list
status = '%d %s' % (response.status_code, response.reason_phrase)
response_headers = list(response.items())
for c in response.cookies.values():
response_headers.append(('Set-Cookie', c.output(header='')))
# Tell the WSGI server what status and headers to use
start_response(status, response_headers)
# Return the response body as an iterable
if request.method == 'HEAD':
return [b'']
return response
The __call__ method is the WSGI application. It takes environ and start_response, does Django things, and returns a response iterable.
The Middleware Stack
self.load_middleware() builds a chain of callables. If your MIDDLEWARE setting looks like:
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'myapp.middleware.CustomMiddleware',
]
Then load_middleware() constructs something conceptually like:
def _get_response_none(request):
# The actual view dispatcher
return view_function(request)
handler = _get_response_none
for middleware_path in reversed(MIDDLEWARE):
middleware_class = import_string(middleware_path)
handler = middleware_class(handler)
self._middleware_chain = handler
Each middleware wraps the next one. When you call self.get_response(request), you’re calling self._middleware_chain(request), which unwinds through each middleware layer until it hits the view. This is exactly the turtles-all-the-way-down middleware pattern we’ll implement ourselves in the WSGI section.
The URL Dispatcher
Inside get_response, Django eventually calls:
# django/core/handlers/base.py (simplified)
def _get_response(self, request):
callback, callback_args, callback_kwargs = self.resolve_request(request)
response = callback(request, *callback_args, **callback_kwargs)
return response
resolve_request does URL routing: it takes request.path_info and walks through the urlpatterns list, matching regex patterns or path converters until it finds a match. The match returns the view function and any captured URL parameters.
That view function is what you write. Django calls it. You return an HttpResponse. Django serializes it. Done.
FastAPI’s Request Path
FastAPI is an ASGI framework (we’ll cover ASGI in Part III), but the same “it’s just a callable” principle applies.
from fastapi import FastAPI
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return {"user_id": user_id}
app here is a FastAPI instance. FastAPI inherits from Starlette, which implements the ASGI interface. When Uvicorn calls your application:
# uvicorn calls this
await app(scope, receive, send)
Starlette’s __call__ (simplified):
class Starlette:
async def __call__(self, scope, receive, send):
scope["app"] = self
if self.middleware_stack is None:
self.middleware_stack = self.build_middleware_stack()
await self.middleware_stack(scope, receive, send)
Same pattern: a middleware stack, built once, called on every request.
At the bottom of the stack is the router. FastAPI’s router matches the path and HTTP method against registered routes, extracts path parameters, and calls your endpoint function.
The Clever Part: Dependency Injection and Type Hints
The thing FastAPI adds is automatic parsing of function parameters using type hints. When you write:
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: Optional[str] = None):
return {"item_id": item_id, "q": q}
FastAPI uses inspect.signature to introspect the function, reads the type annotations, and automatically:
- Extracts
item_idfrom the path (because it’s in the{item_id}path template) - Extracts
qfrom the query string (because it’s not in the path) - Converts
item_idtointand validates it - Returns a 422 if conversion fails
This is done at startup (when the route is registered) using Pydantic and Python’s inspect module. There’s no magic — it’s reflection and type coercion applied systematically.
# What FastAPI is doing under the hood (very simplified)
import inspect
from typing import get_type_hints
def build_endpoint_handler(func):
sig = inspect.signature(func)
hints = get_type_hints(func)
async def handler(scope, receive, send):
# Extract path params, query params from scope
path_params = scope.get("path_params", {})
query_string = scope.get("query_string", b"").decode()
# Build kwargs for the function
kwargs = {}
for name, param in sig.parameters.items():
if name in path_params:
kwargs[name] = hints[name](path_params[name]) # type coercion
# ... query param extraction, body parsing, etc.
result = await func(**kwargs)
# Serialize result to JSON response
# ...
return handler
That’s the core of FastAPI’s “magic”. It’s Python’s inspect module and type coercion, applied at startup to build efficient request handlers.
Flask’s Request Path
Flask is simpler than Django but uses the same WSGI interface. The Flask class has a __call__ method:
class Flask:
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
ctx = self.request_context(environ)
ctx.push()
try:
response = self.full_dispatch_request()
return response(environ, start_response)
finally:
ctx.pop()
Flask’s “request context” and “application context” are thread-local (or greenlet-local) storage — that’s how flask.request works without being passed as a parameter. When you access request.method in a Flask view, Flask looks up the current request from a thread-local stack that was pushed when ctx.push() was called.
This is convenient, but it’s not magic. It’s an implicit parameter passing mechanism. WSGI is synchronous and single-threaded-per-request, so thread-local storage works. This is also why Flask’s approach breaks down with async — thread-locals don’t survive across await points, which is one reason Flask’s async support required careful workarounds.
What They All Have in Common
Every Python web framework, at its core, does this:
environ/scope
│
▼
┌─────────────────────────────────────────────────────┐
│ Middleware stack │
│ ├── Security / CORS / compression / auth │
│ ├── Session management │
│ └── (your middleware here) │
│ │ │
│ ▼ │
│ URL Router │
│ └── match path → find handler function │
│ │ │
│ ▼ │
│ Handler / View │
│ └── your code runs here │
│ │ │
│ ▼ │
│ Response serialization │
│ └── status + headers + body → bytes │
└─────────────────────────────────────────────────────┘
│
▼
start_response(status, headers) + return [body_bytes]
The framework is providing:
- A way to compose middleware (the stack builder)
- URL routing (pattern matching on
PATH_INFO) - Request parsing (wrapping
environin a convenient object) - Response serialization (turning your return value into WSGI-compatible bytes)
None of these are hard to understand. Some are hard to implement well — Django’s URL dispatcher handles edge cases you’d never think of, and FastAPI’s type coercion is quite sophisticated. But conceptually, they’re all doing the same four things.
Building It Yourself
The rest of Part II is devoted to building each of these pieces from scratch. By the time we’re done, you’ll have:
- A working WSGI server
- A middleware stack
- A URL router
- Request and Response classes
None of it will be production-ready in the sense that Django is production-ready. But all of it will be correct, and building it will give you a ground-level understanding that reading the Django source code alone doesn’t provide.
The question isn’t “how does Django do routing?” The question is “what problem does routing solve, and what’s the simplest possible correct implementation?” Once you’ve answered the second question yourself, the first becomes easy to read.
Let’s start with the spec.
The WSGI Spec (It Fits on a Napkin)
PEP 3333 is the WSGI specification. It is 2,500 words long. For context, this chapter is longer. The actual interface it defines is expressible in fewer than ten lines of Python. This is either a sign of elegant design or a sign that we’ve been dramatically over-complicating web development for twenty years. Possibly both.
Let’s read the spec together — not the full PEP, but the essential contract it defines.
The Interface in Full
The complete WSGI interface, distilled:
def application(environ: dict, start_response: callable) -> Iterable[bytes]:
"""
A WSGI application is any callable that:
1. Accepts environ (dict) and start_response (callable)
2. Calls start_response(status, response_headers) exactly once before returning
3. Returns an iterable of byte strings (the response body)
"""
status = "200 OK"
response_headers = [("Content-Type", "text/plain; charset=utf-8")]
start_response(status, response_headers)
return [b"Hello, world!\n"]
That’s it. That’s WSGI. Everything else is detail.
The environ Dictionary
environ is a Python dictionary containing CGI-style environment variables plus some WSGI-specific additions. When Gunicorn receives an HTTP request, it parses it and packs the results into this dict.
Here are the keys your application will actually use:
# Request method
environ['REQUEST_METHOD'] # "GET", "POST", "PUT", "DELETE", etc.
# URL components
environ['PATH_INFO'] # "/users/42" — the URL path
environ['QUERY_STRING'] # "active=true&page=2" — without the "?"
environ['SERVER_NAME'] # "example.com"
environ['SERVER_PORT'] # "80" (note: string, not int)
# HTTP headers (prefixed with HTTP_, hyphens become underscores, uppercased)
environ['HTTP_HOST'] # "example.com"
environ['HTTP_ACCEPT'] # "text/html,application/xhtml+xml,..."
environ['HTTP_AUTHORIZATION'] # "Bearer abc123"
environ['HTTP_CONTENT_TYPE'] # Note: also available as CONTENT_TYPE (no HTTP_ prefix)
environ['CONTENT_TYPE'] # "application/json"
environ['CONTENT_LENGTH'] # "42" (string, or empty string if unknown)
# Request body
environ['wsgi.input'] # file-like object, read() to get the body bytes
# WSGI metadata
environ['wsgi.version'] # (1, 0)
environ['wsgi.url_scheme'] # "http" or "https"
environ['wsgi.multithread'] # True if server may run multiple threads
environ['wsgi.multiprocess'] # True if server may fork multiple processes
environ['wsgi.run_once'] # True if application will only be invoked once
environ['wsgi.errors'] # file-like object for error output (stderr)
Let’s write a small app that dumps the environ so you can see it for yourself:
import json
def environ_dumper(environ, start_response):
"""Dump the environ dict as JSON for debugging."""
# Some values aren't JSON-serializable; convert them
safe_environ = {}
for key, value in sorted(environ.items()):
if isinstance(value, (str, int, float, bool, type(None))):
safe_environ[key] = value
else:
safe_environ[key] = f"<{type(value).__name__}>"
body = json.dumps(safe_environ, indent=2).encode("utf-8")
start_response("200 OK", [
("Content-Type", "application/json"),
("Content-Length", str(len(body))),
])
return [body]
if __name__ == "__main__":
from wsgiref.simple_server import make_server
server = make_server("127.0.0.1", 8000, environ_dumper)
print("Serving on http://127.0.0.1:8000")
server.serve_forever()
Run this, hit http://127.0.0.1:8000/some/path?foo=bar in your browser, and you’ll see everything Gunicorn (or wsgiref) passes to your application. It demystifies a lot.
The start_response Callable
start_response is a callable provided by the server. Your application calls it to set the response status and headers. Its signature:
def start_response(
status: str, # e.g. "200 OK", "404 Not Found"
response_headers: list, # list of (name, value) tuples
exc_info=None, # for error handling, discussed below
) -> write_callable: # legacy write callable, don't use this
The status string must be a valid HTTP status: three digits, a space, and a reason phrase. The reason phrase can be anything — the spec doesn’t require it to be the canonical one — but convention is to use the standard phrases.
The response_headers is a list of (name, value) tuples. Names are case-insensitive in HTTP; convention is Title-Case. Values must be strings.
# Valid calls to start_response
start_response("200 OK", [
("Content-Type", "text/html; charset=utf-8"),
("X-Custom-Header", "my-value"),
])
start_response("404 Not Found", [
("Content-Type", "text/plain"),
])
start_response("302 Found", [
("Location", "https://example.com/new-url"),
("Content-Type", "text/plain"),
])
The write callable that start_response returns is a legacy escape hatch for applications that need to write response data before returning from the callable. Don’t use it in new code. The spec includes it for backward compatibility with pre-WSGI CGI-style code.
The Return Value
Your application must return an iterable of byte strings. Each item in the iterable is a chunk of the response body. The server will concatenate and send them.
# All of these are valid return values:
return [b"Hello, world!"] # Single chunk
return [b"Hello, ", b"world!"] # Multiple chunks
return iter([b"chunk 1", b"chunk 2"]) # Iterator
return (b"x" for x in range(3)) # Generator
# For streaming responses, a generator is useful:
def streaming_app(environ, start_response):
start_response("200 OK", [("Content-Type", "text/plain")])
def generate():
for i in range(100):
yield f"Line {i}\n".encode("utf-8")
return generate()
One important constraint: you must call start_response before (or while) the server is consuming your return iterable. In practice, call it before you return. The server will call next() on your iterable to get chunks, and by that point it needs to know the status and headers.
The close() Method
If your return iterable has a close() method, the server will call it when it’s done — even if an exception occurred during iteration. This is how you ensure cleanup (open file handles, database connections, etc.) happens even when the response is only partially sent.
class FileResponse:
def __init__(self, filepath):
self.f = open(filepath, "rb")
def __iter__(self):
while chunk := self.f.read(8192):
yield chunk
def close(self):
self.f.close() # Server will call this
def file_serving_app(environ, start_response):
path = environ['PATH_INFO'].lstrip('/')
response = FileResponse(path)
start_response("200 OK", [("Content-Type", "application/octet-stream")])
return response
Error Handling with exc_info
If an error occurs after start_response has been called (and headers may have been sent), you can call start_response again with exc_info set. This is how middleware propagates exceptions:
import sys
def application(environ, start_response):
try:
# ... do work ...
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"OK"]
except Exception:
start_response("500 Internal Server Error",
[("Content-Type", "text/plain")],
sys.exc_info()) # Pass exception info
return [b"Internal Server Error"]
If headers haven’t been sent yet, the server will use the new status/headers. If headers have already been sent (which can happen with streaming responses), the server will re-raise the exception — there’s nothing else it can do at that point.
What the Server Side Looks Like
To fully understand the contract, it helps to see what the server-side caller looks like. Here’s a minimal version:
def call_wsgi_app(app, environ):
"""
Call a WSGI app and collect the response.
Returns (status, headers, body_bytes).
"""
response_started = []
def start_response(status, headers, exc_info=None):
if exc_info:
try:
if response_started:
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None
response_started.append((status, headers))
result = app(environ, start_response)
try:
body = b"".join(result)
finally:
if hasattr(result, "close"):
result.close()
status, headers = response_started[0]
return status, headers, body
This is what Gunicorn’s worker essentially does — before sending it back over the socket as HTTP bytes. Note how start_response just stores the status and headers; the actual sending happens after app() returns.
The Complete Spec, Annotated
Here’s the one-page summary of everything WSGI requires:
APPLICATION:
- Must be callable
- Takes two arguments: environ (dict), start_response (callable)
- Must call start_response(status, headers) exactly once
(or on error, may call it again with exc_info)
- Must return an iterable of byte strings
- The iterable may have a close() method; if so, server must call it
ENVIRON:
- Must contain CGI/1.1 variables (REQUEST_METHOD, PATH_INFO, etc.)
- Must contain wsgi.input (readable file-like object for body)
- Must contain wsgi.errors (writable file-like for errors)
- Must contain wsgi.version (1, 0)
- Must contain wsgi.url_scheme ("http" or "https")
- Must contain wsgi.multithread, wsgi.multiprocess, wsgi.run_once (bools)
- HTTP headers: prefixed with HTTP_, hyphens→underscores, uppercased
Exception: Content-Type and Content-Length have no HTTP_ prefix
START_RESPONSE:
- Takes status (str), response_headers (list of 2-tuples), exc_info (optional)
- Status format: "NNN Reason Phrase"
- Headers: list of (name, value) tuples, strings only
- Returns a write() callable (legacy; don't use)
- May be called again only if exc_info is provided
SERVER:
- Must call app(environ, start_response) to get response
- Must send status and headers before body
- Must call result.close() if it exists, even on error
- Must handle chunked responses (iterate over return value)
That’s the contract. Two functions talking to each other with a well-defined interface. The server provides environ and start_response; your app provides the response.
In the next chapter, we’ll write a real WSGI application — no wsgiref, no framework, just the spec and a server call.
Your First WSGI App (No Training Wheels)
Let’s build something real. Not “hello world” (we did that in the introduction) — a genuinely useful WSGI application with multiple routes, request parsing, and proper response handling. All without a framework.
By the end of this chapter you’ll have a working JSON API that you can run with Gunicorn. It won’t be pretty. That’s the point.
The Problem
We’re building a simple in-memory task management API. It will support:
GET /tasks— list all tasksPOST /tasks— create a taskGET /tasks/{id}— get a specific taskDELETE /tasks/{id}— delete a task
That’s enough to demonstrate routing, request body parsing, path parameter extraction, and proper HTTP response semantics without drowning in incidental complexity.
Reading the Request Body
The first thing most tutorials skip over: how do you read the request body in WSGI?
def read_body(environ) -> bytes:
"""Read the request body from environ['wsgi.input']."""
try:
content_length = int(environ.get('CONTENT_LENGTH', 0) or 0)
except (ValueError, TypeError):
content_length = 0
if content_length > 0:
return environ['wsgi.input'].read(content_length)
return b''
Why or 0? Because CONTENT_LENGTH might be an empty string (""), which int() can’t convert. The or 0 handles that case. Why the try/except? Because you can’t fully trust incoming headers.
Why read(content_length) rather than just read()? The spec says wsgi.input may be a socket-backed stream. Calling read() without a limit might block indefinitely waiting for a connection that never closes. Always read exactly Content-Length bytes.
Parsing JSON Bodies
import json
from typing import Any, Optional
def parse_json_body(environ) -> Optional[Any]:
"""Parse a JSON request body, returning None if absent or invalid."""
content_type = environ.get('CONTENT_TYPE', '')
if 'application/json' not in content_type:
return None
body = read_body(environ)
if not body:
return None
try:
return json.loads(body)
except json.JSONDecodeError:
return None
Building Responses
Rather than calling start_response directly everywhere, let’s build a small helper:
def json_response(start_response, data: Any, status: int = 200) -> list[bytes]:
"""Send a JSON response."""
STATUS_PHRASES = {
200: "OK",
201: "Created",
400: "Bad Request",
404: "Not Found",
405: "Method Not Allowed",
500: "Internal Server Error",
}
body = json.dumps(data, indent=2).encode("utf-8")
phrase = STATUS_PHRASES.get(status, "Unknown")
start_response(
f"{status} {phrase}",
[
("Content-Type", "application/json"),
("Content-Length", str(len(body))),
]
)
return [body]
The Application
Now the actual application. Notice the structure: it’s just a function that dispatches based on method and path.
import json
import re
import uuid
from typing import Any, Optional
# In-memory store
tasks: dict[str, dict] = {}
def read_body(environ) -> bytes:
try:
content_length = int(environ.get('CONTENT_LENGTH', 0) or 0)
except (ValueError, TypeError):
content_length = 0
if content_length > 0:
return environ['wsgi.input'].read(content_length)
return b''
def parse_json_body(environ) -> Optional[Any]:
content_type = environ.get('CONTENT_TYPE', '')
if 'application/json' not in content_type:
return None
body = read_body(environ)
if not body:
return None
try:
return json.loads(body)
except json.JSONDecodeError:
return None
STATUS_PHRASES = {
200: "OK", 201: "Created", 400: "Bad Request",
404: "Not Found", 405: "Method Not Allowed",
}
def json_response(start_response, data: Any, status: int = 200) -> list[bytes]:
body = json.dumps(data, indent=2).encode("utf-8")
phrase = STATUS_PHRASES.get(status, "Unknown")
start_response(
f"{status} {phrase}",
[("Content-Type", "application/json"),
("Content-Length", str(len(body)))]
)
return [body]
def application(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# Route: GET /tasks
if path == '/tasks' and method == 'GET':
return json_response(start_response, list(tasks.values()))
# Route: POST /tasks
if path == '/tasks' and method == 'POST':
data = parse_json_body(environ)
if not data or 'title' not in data:
return json_response(start_response,
{"error": "title is required"}, 400)
task = {
"id": str(uuid.uuid4()),
"title": data['title'],
"done": False,
}
tasks[task['id']] = task
return json_response(start_response, task, 201)
# Route: /tasks/{id}
match = re.fullmatch(r'/tasks/([^/]+)', path)
if match:
task_id = match.group(1)
if method == 'GET':
task = tasks.get(task_id)
if task is None:
return json_response(start_response,
{"error": "not found"}, 404)
return json_response(start_response, task)
if method == 'DELETE':
if task_id not in tasks:
return json_response(start_response,
{"error": "not found"}, 404)
deleted = tasks.pop(task_id)
return json_response(start_response, deleted)
return json_response(start_response,
{"error": "method not allowed"}, 405)
# 404 for everything else
return json_response(start_response, {"error": "not found"}, 404)
if __name__ == '__main__':
from wsgiref.simple_server import make_server
print("Serving on http://127.0.0.1:8000")
with make_server('127.0.0.1', 8000, application) as server:
server.serve_forever()
Save this as tasks_app.py and run it:
python tasks_app.py
Or with Gunicorn:
pip install gunicorn
gunicorn tasks_app:application
Trying It Out
# Create a task
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Learn WSGI"}'
# {"id": "abc-123", "title": "Learn WSGI", "done": false}
# List tasks
curl http://localhost:8000/tasks
# [{"id": "abc-123", "title": "Learn WSGI", "done": false}]
# Get a specific task
curl http://localhost:8000/tasks/abc-123
# {"id": "abc-123", "title": "Learn WSGI", "done": false}
# Delete a task
curl -X DELETE http://localhost:8000/tasks/abc-123
# {"id": "abc-123", "title": "Learn WSGI", "done": false}
# Missing title
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"description": "oops"}'
# {"error": "title is required"}
# Not found
curl http://localhost:8000/tasks/nonexistent
# {"error": "not found"}
It works. No framework involved. The routing is regex matching on PATH_INFO. The request parsing is reading from wsgi.input. The responses are byte strings with proper headers.
What This Reveals About Frameworks
Look at our application function and notice what’s getting tedious:
- Routing:
if path == '/tasks' and method == 'GET'— this will get unreadable fast - Response construction:
json_response(start_response, data, status)— we’re passingstart_responseeverywhere - Request parsing:
parse_json_body(environ)— repeated on every endpoint that accepts a body - Error handling: every route independently returns error responses
A framework solves these problems. Flask gives you @app.route('/tasks', methods=['GET']). Django gives you URL patterns and view functions. FastAPI gives you type annotations and automatic parsing.
But now you know what they’re solving. The @app.route decorator is just adding your function to a routing table and wrapping it so it conforms to the WSGI interface. The request object (flask.request, django.request) is just a wrapper around environ. The response class is a wrapper around start_response and the return value.
It’s convenience all the way down.
The wsgiref Module
Python’s standard library includes wsgiref, a reference implementation of a WSGI server:
from wsgiref.simple_server import make_server
from wsgiref.validate import validator
# Wrap your app with the validator to catch WSGI spec violations
validated_app = validator(application)
with make_server('127.0.0.1', 8000, validated_app) as server:
server.serve_forever()
wsgiref.validate.validator wraps your application and checks that it correctly implements the WSGI spec — proper return types, calling start_response at the right time, etc. Use it during development; remove it for production.
wsgiref.simple_server is not production-ready (it’s single-threaded and handles one request at a time), but it’s useful for local development when you want zero dependencies.
On State and Concurrency
The tasks dictionary in our app is module-level state. This works fine for a single-process server, but Gunicorn’s default is to use multiple worker processes. Each worker has its own copy of the module, its own tasks dict. Changes in worker 1 are invisible to worker 2.
For production, you’d use a database or shared cache (Redis) instead of in-memory state. This isn’t a WSGI limitation — it’s just how multi-process architectures work. But it’s worth being explicit about: WSGI does not share state between requests. Each call to application() is independent.
This is actually a feature. It makes WSGI applications easy to reason about and easy to scale horizontally. Stateless by default.
Exercises
Before moving on, try these modifications to the app:
- Add a
PATCH /tasks/{id}endpoint that updates thedonefield - Add proper
Content-Typevalidation — return 415 if it’s notapplication/jsonon POST - Add a request logger: print method, path, and status code for every request
- Handle
PATCH /tasks/{id}— change thedonefield
Exercise 3 is a preview of the next chapter. The logging belongs in middleware.
Build a WSGI Server from Scratch
We’ve been running our WSGI applications with wsgiref.simple_server. It works, but it’s a black box. In this chapter, we’ll build our own WSGI server using Python’s socket module — the same primitives that Gunicorn uses, just without fifteen years of production hardening.
By the end of this chapter, you’ll understand exactly what Gunicorn’s workers are doing on every request.
What a WSGI Server Must Do
A WSGI server has one job: accept TCP connections, parse HTTP requests, call WSGI applications, and send HTTP responses. In order:
- Bind to a port and listen for connections
- Accept a connection
- Read bytes from the socket
- Parse the bytes into an HTTP request
- Build the
environdictionary - Define a
start_responsecallable - Call the WSGI application
- Serialize the response (status + headers + body)
- Write the response bytes to the socket
- Close the connection (or keep it alive for HTTP/1.1)
We’ll implement all of these, handling one request at a time (no threading — that’s a refinement for later).
The HTTP Parser
First, let’s build a proper request parser:
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
import io
@dataclass
class ParsedRequest:
method: str
path: str
query_string: str
http_version: str
headers: Dict[str, str]
body: bytes
def parse_request(raw: bytes) -> Optional[ParsedRequest]:
"""
Parse raw HTTP request bytes into a ParsedRequest.
Returns None if the request is malformed.
"""
# Split at the blank line separating headers from body
header_end = raw.find(b"\r\n\r\n")
if header_end == -1:
return None # Incomplete request
header_section = raw[:header_end].decode("latin-1")
body = raw[header_end + 4:]
lines = header_section.split("\r\n")
if not lines:
return None
# Parse the request line
try:
method, raw_path, http_version = lines[0].split(" ", 2)
except ValueError:
return None
# Split path from query string
if "?" in raw_path:
path, query_string = raw_path.split("?", 1)
else:
path, query_string = raw_path, ""
# Parse headers
headers: Dict[str, str] = {}
for line in lines[1:]:
if ": " in line:
name, _, value = line.partition(": ")
headers[name.lower()] = value
# Trim body to Content-Length if present
content_length = int(headers.get("content-length", len(body)))
body = body[:content_length]
return ParsedRequest(
method=method,
path=path,
query_string=query_string,
http_version=http_version,
headers=headers,
body=body,
)
Building the environ Dictionary
The spec defines exactly what keys environ must contain. Here’s the conversion from our parsed request:
import os
import sys
def build_environ(request: ParsedRequest, server_name: str, server_port: int) -> dict:
"""
Build a WSGI environ dict from a parsed HTTP request.
See PEP 3333 for the complete specification.
"""
environ = {
# CGI variables
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
"PATH_INFO": request.path,
"QUERY_STRING": request.query_string,
"SERVER_NAME": server_name,
"SERVER_PORT": str(server_port),
"SERVER_PROTOCOL": request.http_version,
"GATEWAY_INTERFACE": "CGI/1.1",
# WSGI variables
"wsgi.version": (1, 0),
"wsgi.url_scheme": "http",
"wsgi.input": io.BytesIO(request.body),
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
}
# Content-Type and Content-Length: no HTTP_ prefix (CGI convention)
if "content-type" in request.headers:
environ["CONTENT_TYPE"] = request.headers["content-type"]
else:
environ["CONTENT_TYPE"] = ""
if "content-length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers["content-length"]
else:
environ["CONTENT_LENGTH"] = ""
# All other headers: HTTP_ prefix, uppercased, hyphens → underscores
for name, value in request.headers.items():
if name in ("content-type", "content-length"):
continue
key = "HTTP_" + name.upper().replace("-", "_")
environ[key] = value
return environ
The Response Builder
def build_response(status: str, headers: List[Tuple[str, str]], body: bytes) -> bytes:
"""Serialize a WSGI response to HTTP bytes."""
lines = [f"HTTP/1.1 {status}\r\n"]
for name, value in headers:
lines.append(f"{name}: {value}\r\n")
lines.append("\r\n")
return "".join(lines).encode("latin-1") + body
The Server
Now we put it all together:
import socket
import sys
from typing import Callable
def serve(app: Callable, host: str = "127.0.0.1", port: int = 8000) -> None:
"""
A minimal single-threaded WSGI server.
Handles one request at a time. Not suitable for production,
but suitable for understanding what production servers do.
"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Allow reusing the address immediately after restart
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Serving on http://{host}:{port}", file=sys.stderr)
while True:
try:
conn, addr = server_socket.accept()
handle_connection(conn, addr, app, host, port)
except KeyboardInterrupt:
print("\nShutting down.", file=sys.stderr)
break
server_socket.close()
def handle_connection(
conn: socket.socket,
addr: tuple,
app: Callable,
server_name: str,
server_port: int,
) -> None:
"""Handle a single HTTP connection."""
try:
# Read the full request
raw = receive_request(conn)
if not raw:
return
# Parse it
request = parse_request(raw)
if request is None:
conn.sendall(b"HTTP/1.1 400 Bad Request\r\n\r\n")
return
# Build environ and call the app
environ = build_environ(request, server_name, server_port)
status, headers, body = call_app(app, environ)
# Send the response
response = build_response(status, headers, body)
conn.sendall(response)
except Exception as e:
print(f"Error handling request: {e}", file=sys.stderr)
try:
conn.sendall(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
except Exception:
pass
finally:
conn.close()
def receive_request(conn: socket.socket) -> bytes:
"""
Read bytes from a socket until we have a complete HTTP request.
Handles the header/body split correctly.
"""
data = b""
conn.settimeout(5.0)
try:
# Read until we have the full headers
while b"\r\n\r\n" not in data:
chunk = conn.recv(4096)
if not chunk:
return data
data += chunk
# Find the end of headers
header_end = data.find(b"\r\n\r\n") + 4
# Determine Content-Length from headers
header_section = data[:header_end].decode("latin-1")
content_length = 0
for line in header_section.split("\r\n"):
if line.lower().startswith("content-length:"):
try:
content_length = int(line.split(":", 1)[1].strip())
except ValueError:
pass
break
# Read the body if needed
body_received = len(data) - header_end
while body_received < content_length:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
body_received += len(chunk)
except socket.timeout:
pass # Return whatever we have
return data
def call_app(app: Callable, environ: dict) -> tuple:
"""
Call a WSGI application and collect the response.
Returns (status, headers, body_bytes).
"""
response_args = []
def start_response(status, headers, exc_info=None):
if exc_info:
try:
if response_args:
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None
response_args.clear()
response_args.append((status, headers))
result = app(environ, start_response)
try:
body = b"".join(result)
finally:
if hasattr(result, "close"):
result.close()
if not response_args:
raise RuntimeError("WSGI app did not call start_response")
status, headers = response_args[0]
return status, headers, body
Putting It All Together
# wsgi_server.py — the complete server in one file
import io
import os
import socket
import sys
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
@dataclass
class ParsedRequest:
method: str
path: str
query_string: str
http_version: str
headers: Dict[str, str]
body: bytes
def parse_request(raw: bytes) -> Optional[ParsedRequest]:
header_end = raw.find(b"\r\n\r\n")
if header_end == -1:
return None
header_section = raw[:header_end].decode("latin-1")
body = raw[header_end + 4:]
lines = header_section.split("\r\n")
try:
method, raw_path, http_version = lines[0].split(" ", 2)
except ValueError:
return None
path, query_string = (raw_path.split("?", 1) if "?" in raw_path
else (raw_path, ""))
headers: Dict[str, str] = {}
for line in lines[1:]:
if ": " in line:
name, _, value = line.partition(": ")
headers[name.lower()] = value
content_length = int(headers.get("content-length", len(body)))
return ParsedRequest(method, path, query_string, http_version,
headers, body[:content_length])
def build_environ(req: ParsedRequest, host: str, port: int) -> dict:
env = {
"REQUEST_METHOD": req.method,
"SCRIPT_NAME": "",
"PATH_INFO": req.path,
"QUERY_STRING": req.query_string,
"SERVER_NAME": host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": req.http_version,
"GATEWAY_INTERFACE": "CGI/1.1",
"wsgi.version": (1, 0),
"wsgi.url_scheme": "http",
"wsgi.input": io.BytesIO(req.body),
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"CONTENT_TYPE": req.headers.get("content-type", ""),
"CONTENT_LENGTH": req.headers.get("content-length", ""),
}
for name, value in req.headers.items():
if name not in ("content-type", "content-length"):
env["HTTP_" + name.upper().replace("-", "_")] = value
return env
def build_response(status: str, headers: List[Tuple[str, str]],
body: bytes) -> bytes:
lines = [f"HTTP/1.1 {status}\r\n"]
for name, value in headers:
lines.append(f"{name}: {value}\r\n")
lines.append("\r\n")
return "".join(lines).encode("latin-1") + body
def receive_request(conn: socket.socket) -> bytes:
data = b""
conn.settimeout(5.0)
try:
while b"\r\n\r\n" not in data:
chunk = conn.recv(4096)
if not chunk:
return data
data += chunk
header_end = data.find(b"\r\n\r\n") + 4
content_length = 0
for line in data[:header_end].decode("latin-1").split("\r\n"):
if line.lower().startswith("content-length:"):
try:
content_length = int(line.split(":", 1)[1].strip())
except ValueError:
pass
break
while len(data) - header_end < content_length:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
except socket.timeout:
pass
return data
def call_app(app: Callable, environ: dict) -> tuple:
response_args = []
def start_response(status, headers, exc_info=None):
if exc_info:
try:
if response_args:
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None
response_args.clear()
response_args.append((status, headers))
result = app(environ, start_response)
try:
body = b"".join(result)
finally:
if hasattr(result, "close"):
result.close()
if not response_args:
raise RuntimeError("App did not call start_response")
status, headers = response_args[0]
return status, headers, body
def serve(app: Callable, host: str = "127.0.0.1", port: int = 8000) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(5)
print(f"Serving on http://{host}:{port}", file=sys.stderr)
try:
while True:
conn, addr = sock.accept()
try:
raw = receive_request(conn)
if not raw:
continue
req = parse_request(raw)
if req is None:
conn.sendall(b"HTTP/1.1 400 Bad Request\r\n\r\n")
continue
environ = build_environ(req, host, port)
status, headers, body = call_app(app, environ)
conn.sendall(build_response(status, headers, body))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
try:
conn.sendall(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
except Exception:
pass
finally:
conn.close()
except KeyboardInterrupt:
print("\nShutting down.", file=sys.stderr)
finally:
sock.close()
# --- Test application ---
def hello_app(environ, start_response):
name = environ.get("QUERY_STRING", "").split("=")[-1] or "world"
body = f"Hello, {name}!\n".encode("utf-8")
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(body))),
])
return [body]
if __name__ == "__main__":
serve(hello_app)
Save as wsgi_server.py and test it:
python wsgi_server.py &
curl "http://127.0.0.1:8000/?name=WSGI"
# Hello, WSGI!
Now point it at the tasks app from the previous chapter:
# at the bottom of wsgi_server.py, replace the __main__ block:
if __name__ == "__main__":
from tasks_app import application
serve(application)
curl -X POST http://127.0.0.1:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Works on my server"}'
What Gunicorn Does That We Don’t
Our server handles one request at a time. Gunicorn has workers — multiple processes or threads that each run the request loop above simultaneously. The sync worker is essentially what we’ve built, replicated across N processes. The gthread worker uses threads within each process. The gevent worker uses greenlets.
Beyond concurrency, Gunicorn adds:
- HTTP/1.1 keep-alive: reusing connections for multiple requests
- Chunked transfer encoding: streaming responses without known
Content-Length - Request size limits: protecting against large payloads
- Graceful worker restarts: new workers start before old ones die
- Worker timeout killing: killing workers that hang
But the core loop — accept, parse, environ, call, serialize, send — is exactly what we’ve built.
The Latin-1 Encoding Detail
You may have noticed we decode HTTP headers as latin-1 rather than utf-8. This is intentional.
The HTTP spec (RFC 7230) says header field names and values are ISO-8859-1 (latin-1) by default. This is a historical artifact from HTTP/1.0. Headers containing non-ASCII characters are technically not valid in HTTP/1.1 (they should be encoded, e.g., RFC 5987 encoding for Content-Disposition filenames).
In practice, Python’s WSGI servers use latin-1 for headers to preserve byte-for-byte fidelity. If you round-trip a header through encode('latin-1').decode('latin-1'), you get the original bytes back — which matters for the WSGI spec’s requirement that environ values be native strings.
This is one of those details that doesn’t matter until it does, at which point you’ll spend a day debugging mysterious encoding errors.
The SO_REUSEADDR Detail
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Without this, if you restart the server quickly after stopping it, you’ll get [Errno 98] Address already in use. This happens because the OS keeps the socket in TIME_WAIT state for a while after close, waiting for any stray packets that might still be in transit. SO_REUSEADDR tells the OS to let us reuse the address anyway.
Gunicorn sets this too. So does every other production server. It’s the first thing you add after “it works once” breaks down.
Middleware: Turtles All the Way Down
Middleware is the part of Python web development that sounds complicated until you understand it, at which point it becomes almost disappointingly simple.
A WSGI middleware is a callable that:
- Takes a WSGI application as its argument
- Returns a WSGI application
That’s the whole definition. It’s a function that wraps a function to produce a function. Python has a name for this: a decorator.
The Simplest Possible Middleware
def do_nothing_middleware(app):
"""Middleware that does absolutely nothing."""
def wrapper(environ, start_response):
return app(environ, start_response)
return wrapper
Use it:
def my_app(environ, start_response):
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"Hello"]
wrapped = do_nothing_middleware(my_app)
# wrapped is a WSGI app. Gunicorn can't tell the difference.
wrapped is indistinguishable from my_app from the server’s perspective. It takes environ and start_response, calls start_response, and returns a body iterable. It’s a valid WSGI app.
Now add some behavior:
Request Logging Middleware
import sys
import time
def logging_middleware(app):
"""Log method, path, status, and timing for every request."""
def wrapper(environ, start_response):
method = environ.get("REQUEST_METHOD", "?")
path = environ.get("PATH_INFO", "/")
started = time.monotonic()
# Intercept start_response to capture the status code
status_holder = []
def capturing_start_response(status, headers, exc_info=None):
status_holder.append(status)
return start_response(status, headers, exc_info)
result = app(environ, capturing_start_response)
elapsed = (time.monotonic() - started) * 1000
status = status_holder[0] if status_holder else "???"
print(f"{method} {path} {status} ({elapsed:.1f}ms)", file=sys.stderr)
return result
return wrapper
The key insight: middleware can intercept start_response to inspect or modify the status and headers before passing them to the real start_response. This is how authentication middleware rejects requests with 401 Unauthorized, how compression middleware adds Content-Encoding: gzip, and how CORS middleware adds Access-Control-Allow-Origin headers.
Authentication Middleware
import base64
def basic_auth_middleware(app, username: str, password: str):
"""
HTTP Basic Authentication middleware.
Rejects requests without valid credentials with 401.
"""
# Pre-compute the expected auth header value
credentials = f"{username}:{password}".encode("utf-8")
expected = "Basic " + base64.b64encode(credentials).decode("ascii")
def wrapper(environ, start_response):
auth = environ.get("HTTP_AUTHORIZATION", "")
if auth != expected:
body = b"Unauthorized"
start_response("401 Unauthorized", [
("Content-Type", "text/plain"),
("Content-Length", str(len(body))),
("WWW-Authenticate", 'Basic realm="Protected"'),
])
return [body]
# Credentials valid — call the real app
return app(environ, start_response)
return wrapper
# Usage:
protected_app = basic_auth_middleware(my_app, "admin", "secret")
Notice what happened: basic_auth_middleware takes the app and the credentials. It returns a closure. The closure has access to both app and expected via Python’s closure mechanism.
Stacking Middleware
Here’s where the “turtles all the way down” name comes from. You can stack middleware:
app = my_app
app = basic_auth_middleware(app, "admin", "secret")
app = logging_middleware(app)
When a request comes in, app is now the logging middleware. It calls basic_auth_middleware’s wrapper. That calls the real my_app. The stack unwinds on the way back.
The call stack during a request looks like:
logging wrapper
→ basic_auth wrapper
→ my_app
← response
← response (with status logged)
← response
This is exactly what Django’s MIDDLEWARE setting builds. Each class in the list wraps the next.
A Composable Middleware Builder
Rather than manually nesting, build a pipeline function:
from typing import Callable, List
def build_middleware_stack(
app: Callable,
middleware: List[Callable],
) -> Callable:
"""
Apply middleware in order, outermost last.
middleware = [A, B, C] means: A wraps B wraps C wraps app
Execution order: A → B → C → app → C → B → A
"""
for mw in reversed(middleware):
app = mw(app)
return app
# Usage
stack = build_middleware_stack(my_app, [
logging_middleware,
basic_auth_middleware, # This would need partial application
])
For middleware that takes configuration, use functools.partial or a factory function:
import functools
stack = build_middleware_stack(my_app, [
logging_middleware,
functools.partial(basic_auth_middleware, username="admin", password="secret"),
])
CORS Middleware
A real-world example: Cross-Origin Resource Sharing middleware adds the headers that browsers need for cross-origin requests.
from typing import Optional
def cors_middleware(
app,
allow_origins: List[str] = ["*"],
allow_methods: List[str] = ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers: List[str] = ["Content-Type", "Authorization"],
max_age: int = 86400,
):
"""
CORS middleware. Adds Access-Control-* headers to all responses
and handles OPTIONS preflight requests.
"""
origin_header = ", ".join(allow_origins)
methods_header = ", ".join(allow_methods)
headers_header = ", ".join(allow_headers)
cors_headers = [
("Access-Control-Allow-Origin", origin_header),
("Access-Control-Allow-Methods", methods_header),
("Access-Control-Allow-Headers", headers_header),
("Access-Control-Max-Age", str(max_age)),
]
def wrapper(environ, start_response):
# Handle CORS preflight
if environ["REQUEST_METHOD"] == "OPTIONS":
body = b""
start_response("204 No Content", cors_headers + [
("Content-Length", "0"),
])
return [body]
# Inject CORS headers into every response
def cors_start_response(status, headers, exc_info=None):
return start_response(status, headers + cors_headers, exc_info)
return app(environ, cors_start_response)
return wrapper
Request ID Middleware
Add a unique ID to every request — useful for correlating log lines across multiple services:
import uuid
def request_id_middleware(app, header_name: str = "X-Request-ID"):
"""
Assign a unique request ID to every incoming request.
Uses the incoming header if present, generates one otherwise.
Injects the ID into environ and adds it to the response headers.
"""
environ_key = "HTTP_" + header_name.upper().replace("-", "_")
def wrapper(environ, start_response):
# Use existing ID or generate a new one
request_id = environ.get(environ_key) or str(uuid.uuid4())
environ[environ_key] = request_id
environ["request_id"] = request_id # Convenience key
def id_start_response(status, headers, exc_info=None):
return start_response(
status,
headers + [(header_name, request_id)],
exc_info,
)
return app(environ, id_start_response)
return wrapper
Response Timing Header
import time
def timing_middleware(app):
"""Add X-Response-Time header (milliseconds) to every response."""
def wrapper(environ, start_response):
started = time.monotonic()
def timing_start_response(status, headers, exc_info=None):
elapsed_ms = (time.monotonic() - started) * 1000
return start_response(
status,
headers + [("X-Response-Time", f"{elapsed_ms:.2f}ms")],
exc_info,
)
return app(environ, timing_start_response)
return wrapper
Putting the Stack Together
# app.py
import functools
from tasks_app import application as tasks_app
app = build_middleware_stack(tasks_app, [
timing_middleware,
logging_middleware,
request_id_middleware,
functools.partial(cors_middleware, allow_origins=["https://myapp.com"]),
functools.partial(basic_auth_middleware, username="admin", password="hunter2"),
])
# Run with: gunicorn app:app
The call order is: timing → logging → request_id → cors → basic_auth → tasks_app.
Each middleware layer handles one concern. None of them know about each other. They’re composable because they all speak the same interface: WSGI.
The Class-Based Pattern
You can also write middleware as classes, which is how Django does it:
class LoggingMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
method = environ.get("REQUEST_METHOD", "?")
path = environ.get("PATH_INFO", "/")
print(f"→ {method} {path}", file=sys.stderr)
return self.app(environ, start_response)
The class is callable (via __call__), so LoggingMiddleware(my_app) returns a WSGI app. The behavior is identical to the function-based approach — Django’s preference for classes is a stylistic choice, not a technical requirement.
What Frameworks Add
When Flask does:
from flask_cors import CORS
CORS(app)
It’s wrapping app.wsgi_app with a CORS middleware. Not magic — WSGI middleware composition.
When Django processes your MIDDLEWARE list, it builds a chain of callables where each one wraps the next. The “middleware interface” Django defines (with process_request, process_response, process_view) is just a more structured way to write the same wrapping pattern.
The underlying mechanism is always: callables all the way down.
Routing Without a Framework (It’s Just String Matching)
URL routing sounds like one of those problems that requires a framework. It doesn’t. Routing is pattern matching on a string. The string is PATH_INFO. The patterns are either exact matches, prefix matches, or regular expressions. Everything else is sugar.
Let’s build a router that would be genuinely usable in a small project.
What a Router Does
A router maps incoming requests (method + path) to handler functions. Given:
GET /users/42/posts
It should find the handler registered for GET /users/{user_id}/posts and call it with user_id="42".
The three parts of routing:
- Registration: associate a pattern with a handler
- Matching: find the pattern that matches the incoming path
- Extraction: pull path parameters out of the matched URL
Naive Routing: Just if Statements
We already saw this in the tasks app. It scales to about 5 routes before it becomes unpleasant:
def application(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
if path == '/' and method == 'GET':
return index(environ, start_response)
if path == '/users' and method == 'GET':
return list_users(environ, start_response)
# ... this gets old fast
Let’s do better.
A Dict-Based Exact Router
For applications that only need exact path matching (no parameters), a dict is fast and readable:
from typing import Callable, Dict, Optional, Tuple
# Type: maps (method, path) to handler
RouteTable = Dict[Tuple[str, str], Callable]
def make_exact_router(routes: RouteTable) -> Callable:
"""
Build a WSGI app from a dict of (method, path) → handler mappings.
Returns 404 for unmatched paths, 405 for wrong method on matched path.
"""
# Build a set of known paths for 404 vs 405 distinction
known_paths = {path for (_, path) in routes}
def router(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
key = (method, path)
if key in routes:
return routes[key](environ, start_response)
if path in known_paths:
# Path exists but method is wrong
allowed = [m for (m, p) in routes if p == path]
start_response("405 Method Not Allowed", [
("Allow", ", ".join(allowed)),
("Content-Type", "text/plain"),
])
return [b"Method Not Allowed"]
start_response("404 Not Found", [("Content-Type", "text/plain")])
return [b"Not Found"]
return router
# Usage
def index(environ, start_response):
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"Home"]
def about(environ, start_response):
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"About"]
app = make_exact_router({
("GET", "/"): index,
("GET", "/about"): about,
})
A Pattern Router with Path Parameters
For path parameters like /users/{user_id}, we need pattern matching. We’ll convert path templates to regular expressions:
import re
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple
@dataclass
class Route:
method: str
pattern: re.Pattern
handler: Callable
param_names: List[str]
def compile_path(path_template: str) -> Tuple[re.Pattern, List[str]]:
"""
Convert a path template like '/users/{user_id}/posts/{post_id}'
to a compiled regex and a list of parameter names.
Supported converters:
{name} → matches any non-slash characters
{name:int} → matches one or more digits
{name:slug} → matches URL-safe characters (letters, digits, hyphens)
"""
converters = {
"str": r"[^/]+",
"int": r"[0-9]+",
"slug": r"[a-zA-Z0-9-]+",
}
param_names = []
pattern = ""
remaining = path_template
for match in re.finditer(r"\{(\w+)(?::(\w+))?\}", path_template):
# Add the literal part before this parameter
start = match.start()
pattern += re.escape(remaining[:start - (len(path_template) - len(remaining))])
name = match.group(1)
converter = match.group(2) or "str"
param_names.append(name)
pattern += f"(?P<{name}>{converters.get(converter, converters['str'])})"
remaining = path_template[match.end():]
# A cleaner approach using re.sub:
param_names = []
def replace_param(m):
name = m.group(1)
converter = m.group(2) or "str"
param_names.append(name)
return f"(?P<{name}>{converters.get(converter, converters['str'])})"
regex_str = re.sub(r"\{(\w+)(?::(\w+))?\}", replace_param, path_template)
regex_str = "^" + regex_str + "$"
return re.compile(regex_str), param_names
class Router:
"""
A WSGI router with path parameter support.
Usage:
router = Router()
@router.route("GET", "/users/{user_id:int}")
def get_user(environ, start_response):
user_id = int(environ['route.params']['user_id'])
...
"""
def __init__(self):
self.routes: List[Route] = []
def route(self, method: str, path: str):
"""Decorator to register a route handler."""
def decorator(func: Callable) -> Callable:
pattern, param_names = compile_path(path)
self.routes.append(Route(
method=method.upper(),
pattern=pattern,
handler=func,
param_names=param_names,
))
return func
return decorator
# Convenience methods
def get(self, path: str):
return self.route("GET", path)
def post(self, path: str):
return self.route("POST", path)
def put(self, path: str):
return self.route("PUT", path)
def delete(self, path: str):
return self.route("DELETE", path)
def __call__(self, environ, start_response):
"""The router itself is a WSGI app."""
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
matched_routes = []
for route in self.routes:
match = route.pattern.match(path)
if match:
matched_routes.append((route, match))
if not matched_routes:
start_response("404 Not Found", [("Content-Type", "text/plain")])
return [b"Not Found"]
# Check if any match the method
for route, match in matched_routes:
if route.method == method:
# Inject path params into environ
environ['route.params'] = match.groupdict()
return route.handler(environ, start_response)
# Path matched but method didn't
allowed = [r.method for r, _ in matched_routes]
start_response("405 Method Not Allowed", [
("Allow", ", ".join(sorted(set(allowed)))),
("Content-Type", "text/plain"),
])
return [b"Method Not Allowed"]
Using the Router
import json
import uuid
router = Router()
tasks = {} # In-memory store
def json_response(start_response, data, status=200):
phrases = {200: "OK", 201: "Created", 400: "Bad Request",
404: "Not Found", 405: "Method Not Allowed"}
body = json.dumps(data).encode("utf-8")
start_response(f"{status} {phrases.get(status, 'Unknown')}", [
("Content-Type", "application/json"),
("Content-Length", str(len(body))),
])
return [body]
def read_json_body(environ):
try:
length = int(environ.get("CONTENT_LENGTH") or 0)
except ValueError:
return None
if not length:
return None
try:
return json.loads(environ["wsgi.input"].read(length))
except (json.JSONDecodeError, KeyError):
return None
@router.get("/tasks")
def list_tasks(environ, start_response):
return json_response(start_response, list(tasks.values()))
@router.post("/tasks")
def create_task(environ, start_response):
data = read_json_body(environ)
if not data or "title" not in data:
return json_response(start_response, {"error": "title required"}, 400)
task = {"id": str(uuid.uuid4()), "title": data["title"], "done": False}
tasks[task["id"]] = task
return json_response(start_response, task, 201)
@router.get("/tasks/{task_id}")
def get_task(environ, start_response):
task_id = environ["route.params"]["task_id"]
task = tasks.get(task_id)
if task is None:
return json_response(start_response, {"error": "not found"}, 404)
return json_response(start_response, task)
@router.delete("/tasks/{task_id}")
def delete_task(environ, start_response):
task_id = environ["route.params"]["task_id"]
if task_id not in tasks:
return json_response(start_response, {"error": "not found"}, 404)
return json_response(start_response, tasks.pop(task_id))
@router.get("/users/{user_id:int}/tasks")
def user_tasks(environ, start_response):
user_id = environ["route.params"]["user_id"] # Already matched as digits
# In a real app, filter by user_id
return json_response(start_response, {
"user_id": int(user_id),
"tasks": list(tasks.values()),
})
if __name__ == "__main__":
from wsgiref.simple_server import make_server
with make_server("127.0.0.1", 8000, router) as server:
print("http://127.0.0.1:8000")
server.serve_forever()
Test it:
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "understand routing"}'
curl http://localhost:8000/users/42/tasks
Sub-Applications and Mounting
A more advanced pattern: route by path prefix and delegate to sub-applications.
class Mount:
"""
Mount WSGI sub-applications at path prefixes.
app = Mount({
"/api": api_app,
"/admin": admin_app,
"/": fallback_app,
})
"""
def __init__(self, mounts: Dict[str, Callable]):
# Sort by prefix length descending so more specific paths match first
self.mounts = sorted(
mounts.items(),
key=lambda item: len(item[0]),
reverse=True,
)
def __call__(self, environ, start_response):
path = environ.get("PATH_INFO", "/")
for prefix, app in self.mounts:
if path.startswith(prefix):
# Strip the prefix and adjust PATH_INFO/SCRIPT_NAME
environ = dict(environ)
environ["SCRIPT_NAME"] = environ.get("SCRIPT_NAME", "") + prefix
environ["PATH_INFO"] = path[len(prefix):] or "/"
return app(environ, start_response)
start_response("404 Not Found", [("Content-Type", "text/plain")])
return [b"Not Found"]
# Example: mount API and docs under different prefixes
from wsgiref.simple_server import demo_app
app = Mount({
"/api": router, # Our task API
"/": demo_app, # Fallback
})
What Flask’s Router Actually Does
Flask uses Werkzeug’s routing system, which is more sophisticated than ours — it handles URL encoding, trailing slash redirects, weighted route matching (so /users/me takes precedence over /users/{id}), and URL generation (reversing a route name to a URL).
But the core mechanism is the same: convert URL templates to regular expressions, match PATH_INFO against them, extract named groups as parameters. Werkzeug just handles more edge cases and has a more refined API for it.
FastAPI’s router is Starlette’s, which is built on the same principle. The @app.get("/items/{item_id}") decorator registers a pattern and a handler. When a request comes in, Starlette walks the route table looking for a match.
The Trailing Slash Question
Should /users/ and /users match the same route? Opinions vary. Flask defaults to treating them as different routes (and optionally redirecting one to the other). Django’s path() function matches literally.
Our router matches literally too — {path_template} must exactly match, no trailing slash normalization. If you want both to work, register both:
@router.get("/tasks")
@router.get("/tasks/")
def list_tasks(environ, start_response):
...
Or add a middleware that strips trailing slashes:
def strip_trailing_slash(app):
def wrapper(environ, start_response):
path = environ.get("PATH_INFO", "/")
if path != "/" and path.endswith("/"):
environ = dict(environ)
environ["PATH_INFO"] = path.rstrip("/")
return app(environ, start_response)
return wrapper
Routing is opinionated. Now you understand the opinions well enough to choose your own.
Request and Response Objects (DIY Edition)
Working directly with environ and start_response gets old. Not because they’re wrong — they’re a fine low-level interface — but because nobody wants to write environ.get('HTTP_AUTHORIZATION', '').split(' ')[-1] every time they need a Bearer token.
Request and Response objects are wrappers. They take the raw WSGI interface and present it through a more convenient API. In this chapter, we’ll build both.
The Request Object
What do we actually want from a request object? Looking at how we’ve been using environ:
# Things we access constantly:
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
query_string = environ['QUERY_STRING']
content_type = environ.get('CONTENT_TYPE', '')
body = environ['wsgi.input'].read(int(environ.get('CONTENT_LENGTH') or 0))
host = environ.get('HTTP_HOST', '')
auth = environ.get('HTTP_AUTHORIZATION', '')
# Things we compute repeatedly:
params = urllib.parse.parse_qs(query_string)
json_body = json.loads(body)
Let’s wrap all of this:
import json
import urllib.parse
from typing import Any, Dict, List, Optional
class Request:
"""
A wrapper around a WSGI environ dict.
Provides convenient access to request data.
"""
def __init__(self, environ: dict):
self._environ = environ
self._body: Optional[bytes] = None # Cached, body can only be read once
# ── Basic properties ──────────────────────────────────────────────────
@property
def method(self) -> str:
return self._environ['REQUEST_METHOD'].upper()
@property
def path(self) -> str:
return self._environ.get('PATH_INFO', '/')
@property
def query_string(self) -> str:
return self._environ.get('QUERY_STRING', '')
@property
def scheme(self) -> str:
return self._environ.get('wsgi.url_scheme', 'http')
@property
def host(self) -> str:
return self._environ.get('HTTP_HOST', self._environ.get('SERVER_NAME', ''))
@property
def url(self) -> str:
"""The full request URL."""
url = f"{self.scheme}://{self.host}{self.path}"
if self.query_string:
url += f"?{self.query_string}"
return url
# ── Headers ───────────────────────────────────────────────────────────
def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
"""
Get a request header by name (case-insensitive).
Handles the HTTP_ prefix and Content-Type/Content-Length exceptions.
"""
name_lower = name.lower()
if name_lower in ('content-type', 'content-length'):
key = name_lower.replace('-', '_').upper()
else:
key = 'HTTP_' + name_lower.replace('-', '_').upper()
return self._environ.get(key, default)
@property
def content_type(self) -> str:
return self._environ.get('CONTENT_TYPE', '')
@property
def content_length(self) -> int:
try:
return int(self._environ.get('CONTENT_LENGTH') or 0)
except ValueError:
return 0
@property
def authorization(self) -> Optional[str]:
return self.get_header('Authorization')
@property
def bearer_token(self) -> Optional[str]:
"""Extract Bearer token from Authorization header."""
auth = self.authorization
if auth and auth.startswith('Bearer '):
return auth[7:]
return None
# ── Query parameters ──────────────────────────────────────────────────
@property
def query_params(self) -> Dict[str, List[str]]:
"""Parsed query string as dict of name → list of values."""
return urllib.parse.parse_qs(self.query_string, keep_blank_values=True)
def query(self, name: str, default: Optional[str] = None) -> Optional[str]:
"""Get the first value of a query parameter."""
values = self.query_params.get(name)
return values[0] if values else default
def query_list(self, name: str) -> List[str]:
"""Get all values of a query parameter."""
return self.query_params.get(name, [])
# ── Body ──────────────────────────────────────────────────────────────
@property
def body(self) -> bytes:
"""Read and cache the request body."""
if self._body is None:
if self.content_length > 0:
self._body = self._environ['wsgi.input'].read(self.content_length)
else:
self._body = b''
return self._body
@property
def text(self) -> str:
"""Request body decoded as UTF-8 text."""
return self.body.decode('utf-8')
def json(self) -> Any:
"""Parse request body as JSON. Raises ValueError on failure."""
return json.loads(self.body)
def form(self) -> Dict[str, List[str]]:
"""Parse application/x-www-form-urlencoded body."""
return urllib.parse.parse_qs(self.body.decode('utf-8'), keep_blank_values=True)
# ── Path parameters (set by router) ───────────────────────────────────
@property
def path_params(self) -> Dict[str, str]:
"""Path parameters extracted by the router."""
return self._environ.get('route.params', {})
def path_param(self, name: str, default: Optional[str] = None) -> Optional[str]:
return self.path_params.get(name, default)
# ── Convenience ───────────────────────────────────────────────────────
@property
def is_json(self) -> bool:
return 'application/json' in self.content_type
@property
def is_form(self) -> bool:
return 'application/x-www-form-urlencoded' in self.content_type
def __repr__(self) -> str:
return f"<Request {self.method} {self.path}>"
The Response Object
The response side is about building the thing we return from WSGI handlers. The raw interface requires:
- Calling
start_response(status, headers) - Returning an iterable of bytes
A Response object collects status, headers, and body, then handles the WSGI mechanics:
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union
class Response:
"""
A WSGI response builder.
Usage:
def handler(environ, start_response):
response = Response("Hello, world!", content_type="text/plain")
return response(environ, start_response)
"""
def __init__(
self,
body: Union[str, bytes, None] = None,
status: int = 200,
content_type: str = "text/plain; charset=utf-8",
headers: Optional[Dict[str, str]] = None,
):
self.status_code = status
self.headers: Dict[str, str] = {"Content-Type": content_type}
if headers:
self.headers.update(headers)
if body is None:
self._body = b""
elif isinstance(body, str):
self._body = body.encode("utf-8")
else:
self._body = body
@property
def status_line(self) -> str:
phrases = {
200: "OK", 201: "Created", 204: "No Content",
301: "Moved Permanently", 302: "Found", 304: "Not Modified",
400: "Bad Request", 401: "Unauthorized", 403: "Forbidden",
404: "Not Found", 405: "Method Not Allowed",
409: "Conflict", 415: "Unsupported Media Type",
422: "Unprocessable Entity",
500: "Internal Server Error", 503: "Service Unavailable",
}
phrase = phrases.get(self.status_code, "Unknown")
return f"{self.status_code} {phrase}"
def set_header(self, name: str, value: str) -> "Response":
self.headers[name] = value
return self
def set_cookie(
self,
name: str,
value: str,
max_age: Optional[int] = None,
path: str = "/",
http_only: bool = True,
secure: bool = False,
same_site: str = "Lax",
) -> "Response":
cookie = f"{name}={value}; Path={path}; SameSite={same_site}"
if max_age is not None:
cookie += f"; Max-Age={max_age}"
if http_only:
cookie += "; HttpOnly"
if secure:
cookie += "; Secure"
# Multiple Set-Cookie headers — requires special handling
self.headers.setdefault("_cookies", "")
self._cookies = getattr(self, "_cookies", [])
self._cookies.append(cookie)
return self
def __call__(self, environ: dict, start_response: Callable) -> List[bytes]:
"""Make Response a WSGI-compatible callable."""
body = self._body
headers = list(self.headers.items())
# Add Content-Length if not present
if "Content-Length" not in self.headers:
headers.append(("Content-Length", str(len(body))))
# Handle multiple Set-Cookie headers
for cookie in getattr(self, "_cookies", []):
headers.append(("Set-Cookie", cookie))
start_response(self.status_line, headers)
return [body] if body else []
# ── Factory functions ─────────────────────────────────────────────────────────
class JSONResponse(Response):
def __init__(self, data: Any, status: int = 200, **kwargs):
body = json.dumps(data, default=str).encode("utf-8")
super().__init__(
body=body,
status=status,
content_type="application/json",
**kwargs,
)
class HTMLResponse(Response):
def __init__(self, html: str, status: int = 200, **kwargs):
super().__init__(
body=html,
status=status,
content_type="text/html; charset=utf-8",
**kwargs,
)
class RedirectResponse(Response):
def __init__(self, location: str, permanent: bool = False):
super().__init__(
status=301 if permanent else 302,
headers={"Location": location},
)
class EmptyResponse(Response):
def __init__(self, status: int = 204):
super().__init__(status=status)
Adapting the Router to Use Request/Response
Now let’s update the router from the previous chapter to work with our new objects. We’ll add a thin adapter that converts between the WSGI interface and Request/Response objects:
from typing import Callable
HandlerFunc = Callable[[Request], Response]
def wsgi_handler(func: HandlerFunc) -> Callable:
"""
Decorator: wraps a Request→Response function into a WSGI handler.
Injects a Request object and calls the Response.
"""
def wrapper(environ: dict, start_response: Callable) -> list:
request = Request(environ)
response = func(request)
return response(environ, start_response)
return wrapper
# Now our route handlers look like:
router = Router()
@router.get("/tasks/{task_id}")
@wsgi_handler
def get_task(request: Request) -> Response:
task_id = request.path_param("task_id")
task = tasks.get(task_id)
if task is None:
return JSONResponse({"error": "not found"}, status=404)
return JSONResponse(task)
@router.post("/tasks")
@wsgi_handler
def create_task(request: Request) -> Response:
if not request.is_json:
return JSONResponse({"error": "content-type must be application/json"}, 415)
try:
data = request.json()
except ValueError:
return JSONResponse({"error": "invalid JSON"}, 400)
if "title" not in data:
return JSONResponse({"error": "title is required"}, 400)
task = {"id": str(uuid.uuid4()), "title": data["title"], "done": False}
tasks[task["id"]] = task
return JSONResponse(task, status=201)
Look at how readable this is compared to the raw WSGI version. Same underlying mechanism, much cleaner interface.
Testing With Request and Response Objects
One of the biggest benefits of wrapping the WSGI interface: testability. We can construct Request objects with arbitrary environ dicts and inspect Response objects directly.
# test_handlers.py
import io
import json
def make_request(
method: str = "GET",
path: str = "/",
body: bytes = b"",
content_type: str = "",
headers: dict = None,
) -> Request:
"""Build a Request object for testing."""
environ = {
"REQUEST_METHOD": method,
"PATH_INFO": path,
"QUERY_STRING": "",
"CONTENT_TYPE": content_type,
"CONTENT_LENGTH": str(len(body)),
"wsgi.input": io.BytesIO(body),
"wsgi.errors": io.StringIO(),
"wsgi.url_scheme": "http",
"wsgi.version": (1, 0),
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"SERVER_NAME": "testserver",
"SERVER_PORT": "80",
"HTTP_HOST": "testserver",
}
if headers:
for name, value in headers.items():
key = "HTTP_" + name.upper().replace("-", "_")
environ[key] = value
return Request(environ)
def call_handler(handler, request: Request) -> Response:
"""Call a WSGI handler and return a Response-like object."""
responses = []
def start_response(status, headers, exc_info=None):
responses.append((status, headers))
body_chunks = handler(request._environ, start_response)
body = b"".join(body_chunks)
status_str, headers_list = responses[0]
status_code = int(status_str.split(" ")[0])
response = Response(body=body, status=status_code)
for name, value in headers_list:
response.set_header(name, value)
return response
# Tests
def test_create_task():
payload = json.dumps({"title": "test task"}).encode()
request = make_request("POST", "/tasks", body=payload,
content_type="application/json")
request._environ["route.params"] = {}
response = call_handler(create_task.__wrapped__, request)
# Note: __wrapped__ gets past our @wsgi_handler decorator
assert response.status_code == 201
data = json.loads(response._body)
assert data["title"] == "test task"
assert "id" in data
assert data["done"] is False
def test_get_nonexistent_task():
request = make_request("GET", "/tasks/nonexistent")
request._environ["route.params"] = {"task_id": "nonexistent"}
response = call_handler(get_task.__wrapped__, request)
assert response.status_code == 404
if __name__ == "__main__":
test_create_task()
test_get_nonexistent_task()
print("All tests passed.")
What Frameworks Add on Top
Django’s HttpRequest object does exactly what we’ve done, plus:
- Multipart form parsing (file uploads)
- Cookie parsing
- Session integration (request.session)
- User authentication (request.user)
- Per-request caching
Flask’s Request (from Werkzeug) adds:
request.filesfor uploadsrequest.cookiesrequest.args(query params)request.formwith multi-dict behavior- JSON parsing with error handling
FastAPI’s request handling goes further: it automatically converts JSON bodies to Pydantic models based on your function’s type annotations.
But all of these are the same thing we built, with more edge cases handled and more convenience methods added. The foundation is identical: wrap environ in a nicer API, provide a Response class that speaks WSGI.
Closing the Loop
We now have all the pieces of a working web framework:
- Server (previous chapter): accepts connections, calls WSGI apps
- Router (previous chapter): dispatches to handlers based on method + path
- Middleware (two chapters ago): composable wrappers for cross-cutting concerns
- Request/Response (this chapter): a clean interface for handlers
In the patterns section, we’ll assemble these into a small but complete framework. First, we need to understand why WSGI has limits — and what ASGI does about them.
Why WSGI Can’t Have Nice Things
WSGI has been reliable for twenty years. It’s simple, well-understood, and supported by every Python web server ever written. So why does ASGI exist?
The short answer is: WebSockets. The longer answer involves a fundamental mismatch between the WSGI request/response model and how modern web applications actually behave.
The WSGI Model
WSGI assumes a specific shape for web communication:
Client sends request → Server parses it → Your app runs → App returns response → Done
This is the HTTP request/response cycle. One request in, one response out. The entire interaction fits in a single function call:
response_iterable = application(environ, start_response)
When application returns, the transaction is complete. The connection can be closed. The worker is free to handle the next request.
This model works perfectly for 95% of HTTP traffic. GET a page, POST a form, fetch some JSON. One request, one response, move on.
The remaining 5% is where things fall apart.
WebSockets: The Problem
WebSockets are a protocol for bidirectional communication over a persistent connection. Once a WebSocket handshake completes, both client and server can send messages at any time, for as long as the connection stays open. There’s no concept of “one request, one response.”
How would you model this in WSGI? You can’t. Let’s try anyway to see why:
def websocket_app(environ, start_response):
# At this point we want to:
# 1. Complete the WebSocket upgrade handshake
# 2. Read messages from the client
# 3. Send messages to the client
# 4. Keep the connection open indefinitely
# 5. React to messages as they arrive
#
# But start_response must be called synchronously.
# And we must return an iterable.
# And when we return, the connection closes.
#
# There is no way to model "keep connection open, react to events"
# within this interface.
start_response("101 Switching Protocols", [...])
return ??? # What do we return? How do we keep reading?
The WSGI model is inherently synchronous and single-turn. WebSockets are inherently asynchronous and multi-turn.
Some WSGI servers tried to hack around this with extensions. None of them were portable. The WebSocket support in Flask before ASGI required either gevent (monkey-patching) or a separate WebSocket server running alongside the WSGI app, with a reverse proxy in front. It was not elegant.
Long Polling and Server-Sent Events
The same problem, milder:
Long polling: client sends request, server holds it open for up to 30 seconds until there’s data to send, then responds. Works in WSGI (barely), but ties up a worker thread for the entire wait time.
Server-Sent Events (SSE): server sends a stream of events to the client over a single HTTP connection. WSGI can technically do this by returning a generator, but the worker is tied up until the stream ends.
Both of these require holding a connection open while doing work asynchronously. WSGI’s synchronous model handles this with blocking — tie up a thread per connection. This works but doesn’t scale: 1000 concurrent SSE connections = 1000 blocked threads.
The Async Problem
Python 3.4 introduced asyncio. By 2016, async/await was mainstream Python. Web frameworks wanted to write async handlers:
async def get_user(request):
user = await db.fetch_one("SELECT * FROM users WHERE id = ?", user_id)
return JSONResponse(user)
This is genuinely better. The worker isn’t blocked while waiting for the database — it can handle other requests. But you can’t call an async function from a synchronous context without asyncio.run() or equivalent. And WSGI servers call your app synchronously:
# This is what Gunicorn does. It's synchronous.
result = application(environ, start_response)
You can’t stick await in there. A WSGI server cannot efficiently run async applications because the interface itself is synchronous.
Some solutions emerged:
- Run each WSGI request in a thread pool and bridge to asyncio
- Use gevent to make blocking calls look async
- Accept that you can’t use
async/awaitin WSGI handlers
None of these are satisfying. The synchronous interface was a genuine constraint.
The HTTP/2 Problem
HTTP/2 multiplexes multiple requests over a single TCP connection. The server can push resources to the client before they’re requested. Requests can be prioritized. All of this happens over a long-lived connection with multiple concurrent streams.
WSGI models each request as a separate function call with its own environ. It has no concept of a “connection” that persists across requests, no way to push data to the client, no way to handle multiple concurrent streams over a single connection.
HTTP/2 server push never really went anywhere (HTTP/3 dropped it), but the point stands: WSGI’s model of “one function call = one request” is a fundamental constraint that HTTP/2’s connection-level features can’t fit into.
What ASGI Changes
ASGI (Asynchronous Server Gateway Interface) solves these problems with a different model:
Instead of:
# WSGI: one call, one response
response = app(environ, start_response)
ASGI uses:
# ASGI: async, event-based, connection-aware
await app(scope, receive, send)
The differences:
async: the application is an async callable, so Python’s event loop can interleave multiple concurrent requests on a single threadscope: connection metadata (similar toenviron) but includes the type of connection — HTTP, WebSocket, or lifespanreceive: an async callable that your app calls to receive events (incoming messages, request body chunks, WebSocket messages)send: an async callable that your app calls to send events (response start, body chunks, WebSocket messages)
The event-based model means:
- WebSockets work natively: receive a message event, send a message event, repeat
- Server-sent events work without blocking: send body chunk events as data arrives
- HTTP/2 streams could work: each stream is a separate scope
- Lifespan events work:
startupandshutdownevents around the app’s lifecycle
ASGI is more complex than WSGI. That’s not an accident — it’s handling more complex scenarios. But it’s still just a callable. A more sophisticated callable, with a more sophisticated interface, but a callable.
The Cost
Nothing is free. ASGI’s complexity has real costs:
Debugging is harder. Async tracebacks are longer and less obvious. An exception in an async generator might surface somewhere unexpected.
The mental model is different. WSGI is easy to reason about: function in, function out. ASGI requires understanding coroutines, event loops, and the receive/send event model.
Not everything is async. Most database drivers, file I/O, and third-party libraries were written for synchronous use. In ASGI, calling a blocking function in a coroutine blocks the entire event loop. You need asyncio.run_in_executor or async libraries.
You don’t need it for standard HTTP. If your application is 100% request/response with no WebSockets, no SSE, and no HTTP/2 push, WSGI is fine. Gunicorn on a few WSGI workers will serve you well.
But if you do need WebSockets, or you want to write genuinely async handlers that don’t block on I/O, or you’re building something that needs to hold many connections open simultaneously — ASGI is the right tool.
Let’s look at the spec.
The ASGI Spec (scope, receive, send — that’s literally it)
The ASGI specification lives at asgi.readthedocs.io. Like WSGI, the actual interface is simpler than the documentation makes it sound. Unlike WSGI, there are three connection types to understand: HTTP, WebSocket, and Lifespan.
Let’s read the spec.
The Interface
An ASGI application is an async callable with this signature:
async def application(scope: dict, receive: callable, send: callable) -> None:
...
Three arguments:
scope: a dict describing the connection (likeenviron, but for the connection type)receive: an async callable — call it to receive the next event from the clientsend: an async callable — call it to send an event to the client
No return value. All communication happens through receive and send.
The scope Dictionary
scope contains connection metadata. The most important key is type, which tells you what kind of connection you’re handling.
HTTP scope
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"http_version": "1.1", # or "2"
"method": "GET", # uppercase
"path": "/users/42",
"raw_path": b"/users/42",
"query_string": b"active=true",
"root_path": "",
"scheme": "http", # or "https"
"headers": [ # list of (name, value) byte-string tuples
(b"host", b"example.com"),
(b"accept", b"application/json"),
(b"content-type", b"application/json"),
(b"content-length", b"42"),
],
"server": ("127.0.0.1", 8000), # (host, port) tuple
"client": ("127.0.0.1", 54321), # client address
}
Notice: headers are bytes, not strings. ASGI works closer to the wire than WSGI — the header names and values are byte strings, and frameworks convert them to strings when they wrap the scope.
WebSocket scope
scope = {
"type": "websocket",
"asgi": {"version": "3.0"},
"path": "/ws/chat",
"query_string": b"room=general",
"headers": [...], # same format as HTTP
"server": ("127.0.0.1", 8000),
"client": ("127.0.0.1", 54322),
"subprotocols": [], # requested WebSocket subprotocols
}
Lifespan scope
scope = {
"type": "lifespan",
"asgi": {"version": "3.0"},
}
We’ll cover lifespan in detail in its own chapter.
The Events
receive and send deal in events — dicts with a type key.
HTTP events
Received from client:
# http.request — the body of the HTTP request
{
"type": "http.request",
"body": b"...", # bytes (possibly empty)
"more_body": False, # True if more chunks are coming
}
# http.disconnect — client disconnected
{
"type": "http.disconnect",
}
Sent to client:
# http.response.start — sends status and headers
# Must be sent before http.response.body
{
"type": "http.response.start",
"status": 200, # integer, not string
"headers": [
(b"content-type", b"text/plain"),
(b"content-length", b"13"),
],
}
# http.response.body — sends body data
{
"type": "http.response.body",
"body": b"Hello, world!",
"more_body": False, # True = more chunks coming; False = done
}
WebSocket events
Received:
# websocket.connect — client initiated WebSocket handshake
{"type": "websocket.connect"}
# websocket.receive — client sent a message
{
"type": "websocket.receive",
"bytes": None, # bytes message (or None)
"text": "hello", # text message (or None)
}
# websocket.disconnect — client disconnected
{
"type": "websocket.disconnect",
"code": 1000, # WebSocket close code
}
Sent:
# websocket.accept — accept the WebSocket handshake
{
"type": "websocket.accept",
"subprotocol": None, # optional agreed subprotocol
"headers": [], # extra headers in the handshake response
}
# websocket.send — send a message to the client
{
"type": "websocket.send",
"bytes": None, # bytes message (or None)
"text": "hello", # text message (or None)
}
# websocket.close — close the connection
{
"type": "websocket.close",
"code": 1000, # WebSocket close code
}
The Simplest Possible ASGI App
async def application(scope, receive, send):
"""A complete, working ASGI application."""
if scope["type"] != "http":
return # Ignore non-HTTP connections for now
# Read the request (we don't use it, but we should consume it)
event = await receive()
assert event["type"] == "http.request"
# Send the response
await send({
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", b"text/plain; charset=utf-8"),
(b"content-length", b"13"),
],
})
await send({
"type": "http.response.body",
"body": b"Hello, world!",
"more_body": False,
})
Run it with Uvicorn:
pip install uvicorn
uvicorn app:application
That’s a fully functional ASGI application. No framework, no dependencies beyond uvicorn.
Reading the Full Request Body
HTTP bodies can arrive in chunks. The more_body flag tells you if more is coming:
async def read_body(receive: callable) -> bytes:
"""Read the complete request body, handling chunks."""
body = b""
while True:
event = await receive()
if event["type"] == "http.request":
body += event.get("body", b"")
if not event.get("more_body", False):
break
elif event["type"] == "http.disconnect":
break # Client disconnected before sending full body
return body
For small bodies, the server typically sends everything in one http.request event with more_body=False. For large bodies or streaming uploads, you’ll see multiple events.
A More Complete Example
import json
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(scope, receive, send)
elif scope["type"] == "http":
await handle_http(scope, receive, send)
async def handle_lifespan(scope, receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
await send({"type": "lifespan.startup.complete"})
elif event["type"] == "lifespan.shutdown":
await send({"type": "lifespan.shutdown.complete"})
break
async def handle_http(scope, receive, send):
method = scope["method"]
path = scope["path"]
# Read the full body
body = await read_body(receive)
# Route
if path == "/" and method == "GET":
response_body = b"Hello from ASGI!"
status = 200
elif path == "/echo" and method == "POST":
# Echo the request body back
data = json.loads(body) if body else {}
response_body = json.dumps({"echo": data}).encode()
status = 200
else:
response_body = b"Not Found"
status = 404
await send({
"type": "http.response.start",
"status": status,
"headers": [
(b"content-type", b"application/json"),
(b"content-length", str(len(response_body)).encode()),
],
})
await send({
"type": "http.response.body",
"body": response_body,
"more_body": False,
})
async def read_body(receive) -> bytes:
body = b""
while True:
event = await receive()
if event["type"] == "http.request":
body += event.get("body", b"")
if not event.get("more_body", False):
break
elif event["type"] == "http.disconnect":
break
return body
Comparing WSGI and ASGI Side by Side
# WSGI
def wsgi_app(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
body = environ['wsgi.input'].read(int(environ.get('CONTENT_LENGTH') or 0))
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"Hello"]
# ASGI equivalent
async def asgi_app(scope, receive, send):
method = scope['method']
path = scope['path']
body = await read_body(receive)
await send({
"type": "http.response.start",
"status": 200,
"headers": [(b"content-type", b"text/plain")],
})
await send({
"type": "http.response.body",
"body": b"Hello",
"more_body": False,
})
The ASGI version is more verbose for simple HTTP. That’s the cost of the generality — the same interface handles HTTP, WebSockets, and lifespan, so HTTP alone requires a bit more ceremony.
Headers: Bytes or Strings?
ASGI headers are byte-string tuples: (b"content-type", b"text/plain"). This is closer to the wire format — HTTP headers are bytes.
WSGI headers are string tuples: ("Content-Type", "text/plain"). WSGI is at a higher abstraction level, hiding the byte/string conversion.
Frameworks handle the bytes-to-strings conversion for you. When you access request.headers["content-type"] in Starlette or FastAPI, the framework has already decoded the byte strings from scope["headers"]. But at the raw ASGI level, it’s bytes.
The asgi Key
Every scope dict includes an "asgi" key with version information:
scope["asgi"] == {"version": "3.0"}
ASGI 3.0 is the current version (the one we’re describing). Earlier versions had a different interface; you’ll rarely encounter them today. If you’re writing ASGI apps in 2024+, you’re on 3.0.
What Uvicorn Does
Uvicorn is to ASGI what Gunicorn is to WSGI — it handles the transport and calls your application.
When Uvicorn receives an HTTP request:
- Reads bytes from the socket
- Parses the HTTP request
- Builds the
scopedict - Creates
receiveandsendcallables backed by the socket - Calls
await app(scope, receive, send)
When your app calls await receive(), Uvicorn either returns buffered data or reads more from the socket (awaiting the I/O without blocking other connections).
When your app calls await send(event), Uvicorn serializes the event to HTTP bytes and writes them to the socket.
The event loop is asyncio. Multiple connections share a single thread via cooperative multitasking — each await point is an opportunity for the event loop to switch to another connection.
This is the key difference from WSGI: one thread can handle many concurrent connections, because await yields control voluntarily rather than blocking.
Your First ASGI App
We built a tasks API in the WSGI section. Let’s rebuild it as ASGI — same functionality, but async, with better structure. By the end you’ll have a working JSON API that demonstrates the full ASGI request/response cycle.
The Foundation
First, let’s build the utilities we’ll need. In ASGI, headers are bytes and parsing happens more explicitly:
import json
import uuid
from typing import Any, Dict, List, Optional, Tuple
# Type aliases for clarity
Headers = List[Tuple[bytes, bytes]]
def get_header(scope_headers: Headers, name: str) -> Optional[str]:
"""Get a header value from ASGI scope headers (case-insensitive)."""
name_bytes = name.lower().encode("latin-1")
for key, value in scope_headers:
if key.lower() == name_bytes:
return value.decode("latin-1")
return None
def make_headers(*pairs: Tuple[str, str]) -> Headers:
"""Build ASGI headers from string tuples."""
return [
(name.lower().encode("latin-1"), value.encode("latin-1"))
for name, value in pairs
]
async def read_body(receive) -> bytes:
"""Read the full request body, handling chunked delivery."""
body = b""
while True:
event = await receive()
if event["type"] == "http.request":
body += event.get("body", b"")
if not event.get("more_body", False):
break
elif event["type"] == "http.disconnect":
break
return body
async def send_response(send, status: int, body: bytes, headers: Headers) -> None:
"""Send a complete HTTP response."""
# Always include Content-Length
all_headers = list(headers) + [
(b"content-length", str(len(body)).encode())
]
await send({
"type": "http.response.start",
"status": status,
"headers": all_headers,
})
await send({
"type": "http.response.body",
"body": body,
"more_body": False,
})
async def send_json(send, data: Any, status: int = 200) -> None:
"""Send a JSON response."""
body = json.dumps(data, indent=2, default=str).encode("utf-8")
await send_response(
send,
status,
body,
make_headers("content-type", "application/json"),
)
The Application
# In-memory store
tasks: Dict[str, Dict] = {}
async def application(scope, receive, send):
"""Main ASGI application."""
if scope["type"] == "lifespan":
await handle_lifespan(receive, send)
return
if scope["type"] != "http":
return
await handle_http(scope, receive, send)
async def handle_lifespan(receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
# Initialize resources here
print("Application starting up")
await send({"type": "lifespan.startup.complete"})
elif event["type"] == "lifespan.shutdown":
# Clean up resources here
print("Application shutting down")
await send({"type": "lifespan.shutdown.complete"})
break
async def handle_http(scope, receive, send):
method = scope["method"]
path = scope["path"]
# Route dispatch
if path == "/tasks":
if method == "GET":
await list_tasks(scope, receive, send)
elif method == "POST":
await create_task(scope, receive, send)
else:
await send_json(send, {"error": "method not allowed"}, 405)
elif path.startswith("/tasks/"):
task_id = path[len("/tasks/"):]
if not task_id:
await send_json(send, {"error": "not found"}, 404)
return
if method == "GET":
await get_task(task_id, scope, receive, send)
elif method == "DELETE":
await delete_task(task_id, scope, receive, send)
elif method == "PATCH":
await update_task(task_id, scope, receive, send)
else:
await send_json(send, {"error": "method not allowed"}, 405)
else:
await send_json(send, {"error": "not found"}, 404)
async def list_tasks(scope, receive, send):
# Consume the body even if we don't use it (good practice)
await read_body(receive)
await send_json(send, list(tasks.values()))
async def create_task(scope, receive, send):
content_type = get_header(scope["headers"], "content-type") or ""
if "application/json" not in content_type:
await send_json(send, {"error": "Content-Type must be application/json"}, 415)
return
body = await read_body(receive)
try:
data = json.loads(body)
except (json.JSONDecodeError, UnicodeDecodeError):
await send_json(send, {"error": "invalid JSON"}, 400)
return
if "title" not in data:
await send_json(send, {"error": "title is required"}, 400)
return
task = {
"id": str(uuid.uuid4()),
"title": str(data["title"]),
"done": bool(data.get("done", False)),
}
tasks[task["id"]] = task
await send_json(send, task, 201)
async def get_task(task_id: str, scope, receive, send):
await read_body(receive)
task = tasks.get(task_id)
if task is None:
await send_json(send, {"error": "not found"}, 404)
return
await send_json(send, task)
async def delete_task(task_id: str, scope, receive, send):
await read_body(receive)
if task_id not in tasks:
await send_json(send, {"error": "not found"}, 404)
return
deleted = tasks.pop(task_id)
await send_json(send, deleted)
async def update_task(task_id: str, scope, receive, send):
task = tasks.get(task_id)
if task is None:
await send_json(send, {"error": "not found"}, 404)
return
body = await read_body(receive)
try:
data = json.loads(body)
except (json.JSONDecodeError, UnicodeDecodeError):
await send_json(send, {"error": "invalid JSON"}, 400)
return
if "done" in data:
task["done"] = bool(data["done"])
if "title" in data:
task["title"] = str(data["title"])
await send_json(send, task)
Save as asgi_tasks.py, run with uvicorn:
pip install uvicorn
uvicorn asgi_tasks:application --reload
Test it:
# Create a task
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Learn ASGI"}'
# List tasks
curl http://localhost:8000/tasks
# Update a task (replace the ID with one from your previous response)
curl -X PATCH http://localhost:8000/tasks/YOUR-ID \
-H "Content-Type: application/json" \
-d '{"done": true}'
# Delete a task
curl -X DELETE http://localhost:8000/tasks/YOUR-ID
Streaming Responses
One thing ASGI handles well that WSGI struggles with: streaming large responses. Instead of buffering everything in memory, send body chunks as they’re produced:
import asyncio
async def streaming_app(scope, receive, send):
"""Send a large response in chunks."""
if scope["type"] != "http":
return
await read_body(receive) # Consume request body
# Start the response — no Content-Length for streaming
await send({
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", b"text/plain"),
(b"transfer-encoding", b"chunked"),
],
})
# Send data in chunks
for i in range(10):
await asyncio.sleep(0.1) # Simulate work
chunk = f"Line {i}: some data\n".encode()
await send({
"type": "http.response.body",
"body": chunk,
"more_body": True, # More is coming
})
# Final empty body closes the stream
await send({
"type": "http.response.body",
"body": b"",
"more_body": False,
})
curl -N http://localhost:8000/ # -N = no buffering, shows chunks as they arrive
You’ll see lines appear one at a time, 100ms apart. In WSGI, you’d have to return a generator and cross your fingers that Gunicorn didn’t buffer it. In ASGI, the streaming model is first-class.
Detecting Client Disconnects
ASGI lets you detect when a client disconnects mid-request. Useful for canceling expensive work:
import asyncio
async def long_running_app(scope, receive, send):
if scope["type"] != "http":
return
await read_body(receive)
# Run work and listen for disconnect concurrently
disconnect_task = asyncio.ensure_future(wait_for_disconnect(receive))
work_task = asyncio.ensure_future(do_expensive_work())
done, pending = await asyncio.wait(
[disconnect_task, work_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
if disconnect_task in done:
# Client disconnected — don't bother sending a response
return
result = work_task.result()
await send_json(send, result)
async def wait_for_disconnect(receive) -> None:
"""Await the http.disconnect event."""
while True:
event = await receive()
if event["type"] == "http.disconnect":
return
async def do_expensive_work() -> dict:
await asyncio.sleep(5) # Simulate 5 seconds of work
return {"result": "done"}
In WSGI, you can’t do this — you have no way to detect a client disconnect mid-processing. Either you do the work and try to send, or you set a timeout and hope. ASGI gives you a proper event for it.
Query Parameters
ASGI passes query_string as bytes. Parse it:
import urllib.parse
def parse_query(scope) -> Dict[str, List[str]]:
"""Parse query string from ASGI scope."""
qs = scope.get("query_string", b"").decode("latin-1")
return urllib.parse.parse_qs(qs, keep_blank_values=True)
# In a handler:
async def search_tasks(scope, receive, send):
await read_body(receive)
params = parse_query(scope)
query = params.get("q", [""])[0]
done_filter = params.get("done", [None])[0]
results = list(tasks.values())
if query:
results = [t for t in results if query.lower() in t["title"].lower()]
if done_filter is not None:
done = done_filter.lower() == "true"
results = [t for t in results if t["done"] == done]
await send_json(send, results)
The __main__ Runner
For development, uvicorn can be run programmatically:
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"asgi_tasks:application",
host="127.0.0.1",
port=8000,
reload=True, # Auto-reload on file changes
log_level="info",
)
Or as a module:
python -m uvicorn asgi_tasks:application --reload
What This Reveals
Looking at the application we just built, notice what’s verbose:
- The routing is still manual
if/elif - Reading the body requires the
read_bodyhelper - Header access requires
get_header - Every handler needs to
await read_bodyeven if it doesn’t use the body
This is exactly what Starlette (and by extension FastAPI) solves. Starlette’s Request object wraps the ASGI scope with request.method, request.url, request.headers, await request.body(), await request.json(). Starlette’s routing matches paths and dispatches to async view functions. Starlette’s Response classes build the correct http.response.start and http.response.body events.
FastAPI adds type-annotated dependency injection on top of Starlette.
But now you know what they’re building on. Every Starlette route handler ultimately calls await send({"type": "http.response.start", ...}). Every FastAPI request processes an ASGI scope dict. The wrappers are real, the underlying reality is what we’ve been working with.
Build an ASGI Server from Scratch
Building a WSGI server required sockets and HTTP parsing. Building an ASGI server requires all of that plus asyncio. The concepts are the same; the mechanics shift from blocking I/O to coroutines.
This chapter builds a working async HTTP server that calls ASGI applications correctly. Not production-ready — we’ll leave chunked encoding and HTTP/2 for Uvicorn — but correct enough to understand what Uvicorn is actually doing.
The Architecture
An asyncio-based server has a different shape than a threaded one:
asyncio event loop
↓
asyncio.start_server()
↓
handle_connection() — called as a coroutine for each connection
├── reads from asyncio.StreamReader
├── parses HTTP request
├── builds scope dict
├── creates receive/send coroutines
├── awaits app(scope, receive, send)
└── writes to asyncio.StreamWriter
Instead of spawning a thread per connection, we create a coroutine per connection. The event loop switches between coroutines at await points, so hundreds of concurrent connections can be handled by a single thread.
The HTTP Parser
Same logic as the WSGI server, adapted for async reading:
import asyncio
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class ParsedRequest:
method: str
path: str
query_string: bytes
http_version: str
headers: List[Tuple[bytes, bytes]]
body: bytes
async def read_http_request(reader: asyncio.StreamReader) -> Optional[bytes]:
"""
Read a complete HTTP request from an async stream.
Returns raw bytes or None if the connection closed.
"""
data = b""
# Read until we have the complete headers
try:
while b"\r\n\r\n" not in data:
chunk = await asyncio.wait_for(reader.read(4096), timeout=5.0)
if not chunk:
return None
data += chunk
except asyncio.TimeoutError:
return None
# Parse Content-Length to read the body
header_end = data.find(b"\r\n\r\n") + 4
content_length = 0
for line in data[:header_end].decode("latin-1").split("\r\n"):
if line.lower().startswith("content-length:"):
try:
content_length = int(line.split(":", 1)[1].strip())
except ValueError:
pass
break
# Read body if needed
body_received = len(data) - header_end
while body_received < content_length:
try:
chunk = await asyncio.wait_for(reader.read(4096), timeout=5.0)
except asyncio.TimeoutError:
break
if not chunk:
break
data += chunk
body_received += len(chunk)
return data
def parse_request(raw: bytes) -> Optional[ParsedRequest]:
"""Parse raw HTTP bytes into a ParsedRequest."""
header_end = raw.find(b"\r\n\r\n")
if header_end == -1:
return None
header_section = raw[:header_end].decode("latin-1")
body = raw[header_end + 4:]
lines = header_section.split("\r\n")
try:
method, raw_path, http_version = lines[0].split(" ", 2)
except ValueError:
return None
# Split path and query string
if b"?" in raw_path.encode():
path_bytes, query_string = raw_path.encode().split(b"?", 1)
else:
path_bytes = raw_path.encode()
query_string = b""
# Parse headers as byte tuples (ASGI format)
headers: List[Tuple[bytes, bytes]] = []
for line in lines[1:]:
if ": " in line:
name, _, value = line.partition(": ")
headers.append((name.lower().encode("latin-1"),
value.encode("latin-1")))
# Trim body
content_length = 0
for name, value in headers:
if name == b"content-length":
try:
content_length = int(value)
except ValueError:
pass
break
return ParsedRequest(
method=method.upper(),
path=path_bytes.decode("latin-1"),
query_string=query_string,
http_version=http_version,
headers=headers,
body=body[:content_length],
)
The ASGI Bridge
The key piece: create receive and send coroutines that bridge between the ASGI protocol and the TCP connection.
async def make_receive_send(
request: ParsedRequest,
writer: asyncio.StreamWriter,
) -> tuple:
"""
Create the receive and send callables for an ASGI HTTP connection.
Returns (receive, send, get_response_started).
"""
# Track whether we've sent the body yet
body_sent = False
request_consumed = False
response_started = False
disconnect_event = asyncio.Event()
async def receive():
nonlocal request_consumed
if not request_consumed:
request_consumed = True
return {
"type": "http.request",
"body": request.body,
"more_body": False,
}
# Wait for disconnect (in a real server, we'd detect this from the socket)
await disconnect_event.wait()
return {"type": "http.disconnect"}
response_headers = []
response_status = None
async def send(event):
nonlocal response_started, body_sent, response_status
if event["type"] == "http.response.start":
response_status = event["status"]
response_headers.extend(event.get("headers", []))
response_started = True
elif event["type"] == "http.response.body":
if not response_started:
raise RuntimeError("Must send http.response.start before body")
body = event.get("body", b"")
more_body = event.get("more_body", False)
if not body_sent:
# Write the HTTP response headers first
status_line = f"HTTP/1.1 {response_status} {get_reason(response_status)}\r\n"
writer.write(status_line.encode("latin-1"))
for name, value in response_headers:
if isinstance(name, bytes):
name = name.decode("latin-1")
if isinstance(value, bytes):
value = value.decode("latin-1")
writer.write(f"{name}: {value}\r\n".encode("latin-1"))
writer.write(b"\r\n")
body_sent = True
writer.write(body)
if not more_body:
await writer.drain()
disconnect_event.set()
return receive, send
def get_reason(status_code: int) -> str:
reasons = {
200: "OK", 201: "Created", 204: "No Content",
301: "Moved Permanently", 302: "Found", 304: "Not Modified",
400: "Bad Request", 401: "Unauthorized", 403: "Forbidden",
404: "Not Found", 405: "Method Not Allowed",
422: "Unprocessable Entity",
500: "Internal Server Error",
}
return reasons.get(status_code, "Unknown")
The Server Loop
import sys
from typing import Callable
async def handle_connection(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
app: Callable,
server_host: str,
server_port: int,
) -> None:
"""Handle one HTTP connection."""
try:
# Read the request
raw = await read_http_request(reader)
if not raw:
return
# Parse it
request = parse_request(raw)
if request is None:
writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\n")
await writer.drain()
return
# Build the ASGI scope
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"http_version": request.http_version.replace("HTTP/", ""),
"method": request.method,
"path": request.path,
"raw_path": request.path.encode("latin-1"),
"query_string": request.query_string,
"root_path": "",
"scheme": "http",
"headers": request.headers,
"server": (server_host, server_port),
}
# Get client address
peername = writer.get_extra_info("peername")
if peername:
scope["client"] = peername
# Create receive/send
receive, send = await make_receive_send(request, writer)
# Call the ASGI app
await app(scope, receive, send)
except Exception as e:
print(f"Error handling connection: {e}", file=sys.stderr)
try:
writer.write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
await writer.drain()
except Exception:
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
async def serve(
app: Callable,
host: str = "127.0.0.1",
port: int = 8000,
) -> None:
"""Start the ASGI server."""
# Send the lifespan startup event
await send_lifespan_startup(app)
server = await asyncio.start_server(
lambda r, w: handle_connection(r, w, app, host, port),
host,
port,
)
async with server:
addr = server.sockets[0].getsockname()
print(f"Serving on http://{addr[0]}:{addr[1]}", file=sys.stderr)
try:
await server.serve_forever()
except (KeyboardInterrupt, asyncio.CancelledError):
pass
await send_lifespan_shutdown(app)
async def send_lifespan_startup(app: Callable) -> None:
"""Send the lifespan.startup event if the app handles it."""
scope = {"type": "lifespan", "asgi": {"version": "3.0"}}
startup_complete = asyncio.Event()
events = asyncio.Queue()
await events.put({"type": "lifespan.startup"})
async def receive():
return await events.get()
async def send(event):
if event["type"] == "lifespan.startup.complete":
startup_complete.set()
elif event["type"] == "lifespan.startup.failed":
raise RuntimeError(f"Startup failed: {event.get('message', '')}")
try:
task = asyncio.create_task(app(scope, receive, send))
await asyncio.wait_for(startup_complete.wait(), timeout=10.0)
# Don't await the task — it runs for the app's lifetime
except asyncio.TimeoutError:
print("Warning: lifespan startup timed out", file=sys.stderr)
except Exception as e:
print(f"Warning: lifespan startup failed: {e}", file=sys.stderr)
async def send_lifespan_shutdown(app: Callable) -> None:
"""Send the lifespan.shutdown event."""
# In a full implementation, we'd track the lifespan task
# and send shutdown. Simplified here.
pass
Running the Server
# asgi_server.py
# [paste all the code above, then:]
if __name__ == "__main__":
from asgi_tasks import application # The app from the previous chapter
asyncio.run(serve(application))
python asgi_server.py &
curl -X POST http://127.0.0.1:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Works on my ASGI server"}'
Testing Concurrency
The real test: can our server handle multiple requests concurrently? Since we’re using asyncio, the answer should be yes — as long as the handlers don’t block.
# concurrent_test.py
import asyncio
import time
async def make_request(session_num: int) -> float:
"""Make an HTTP request and return how long it took."""
start = time.monotonic()
reader, writer = await asyncio.open_connection("127.0.0.1", 8000)
request = (
f"GET /tasks HTTP/1.1\r\n"
f"Host: localhost\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(request.encode())
await writer.drain()
response = b""
while chunk := await reader.read(4096):
response += chunk
writer.close()
elapsed = time.monotonic() - start
print(f"Request {session_num}: {elapsed*1000:.1f}ms")
return elapsed
async def main():
# Make 10 concurrent requests
tasks = [make_request(i) for i in range(10)]
times = await asyncio.gather(*tasks)
print(f"Total elapsed: {max(times)*1000:.1f}ms")
print(f"Average: {sum(times)/len(times)*1000:.1f}ms")
asyncio.run(main())
With a synchronous server (our WSGI server from earlier), 10 sequential requests take 10x as long as one. With our async server, 10 concurrent requests should take roughly as long as one — because the event loop handles them all simultaneously.
What Uvicorn Does Differently
Our server is correct but incomplete. Uvicorn adds:
HTTP/1.1 keep-alive: reuse the connection for multiple requests. We close after each one. This matters for performance — TLS handshakes and TCP connection setup are expensive.
HTTP/2: multiplexed streams over a single connection. This is substantially more complex — h2 (the Python HTTP/2 library) handles the framing; Uvicorn maps the streams to ASGI scopes.
Proper streaming: chunked transfer encoding for responses without Content-Length, streaming request body parsing.
SSL/TLS: pass --ssl-keyfile and --ssl-certfile to Uvicorn and it handles TLS. Our server is plain HTTP.
Worker processes: uvicorn --workers 4 forks four worker processes, each running the asyncio event loop. More workers = more CPU cores utilized.
Graceful shutdown: when you send SIGTERM, Uvicorn stops accepting new connections, finishes in-flight requests, then exits. Our KeyboardInterrupt handling is abrupt.
But the core logic — accept connection, read request, build scope, call await app(scope, receive, send), write response — is exactly what we’ve implemented.
The asyncio Mental Model
One thing worth making explicit: asyncio is cooperative multitasking. Tasks run until they explicitly yield control with await. When a task awaits network I/O (reader.read(), writer.drain()), the event loop can run other tasks.
This means:
- CPU-bound work blocks everything: if a handler does heavy computation without awaiting, no other connection can run. Use
asyncio.run_in_executorfor CPU-bound work. - Blocking I/O blocks everything:
time.sleep(1)in a handler blocks the entire server for 1 second. Useawait asyncio.sleep(1)instead. - One thread, many connections: unlike threaded servers, there’s no race condition between connections (within one process). The event loop is single-threaded.
This model is why async Python is fast for I/O-bound workloads (web servers, API proxies) but not for CPU-bound ones (image processing, ML inference). Know which one you have.
Lifespan Events (Startup, Shutdown, and Existential Dread)
Every long-running application has things it needs to do before it starts serving requests, and things it needs to do before it stops. Connect to a database. Load a model into memory. Start a background task. Flush a cache. Close connection pools gracefully rather than dropping them mid-operation.
WSGI has no solution for this. You initialize things at module import time (which works) or you use Gunicorn’s worker hooks (which are server-specific). Neither is portable.
ASGI has first-class support for it: the lifespan protocol.
The Lifespan Scope
When an ASGI server starts your application, before processing any HTTP or WebSocket connections, it sends a lifespan scope:
scope = {
"type": "lifespan",
"asgi": {"version": "3.0"},
}
Your application is called with this scope and a receive/send pair. The lifespan coroutine then runs for the entire application lifetime:
Server starts
→ calls app(lifespan_scope, receive, send)
→ app receives "lifespan.startup"
→ app does startup work
→ app sends "lifespan.startup.complete"
→ server starts accepting HTTP/WebSocket connections
...
[Server receives SIGTERM]
→ server stops accepting new connections
→ finishes in-flight requests
→ sends "lifespan.shutdown" to the lifespan coroutine
→ app does cleanup
→ app sends "lifespan.shutdown.complete"
→ server exits
Basic Lifespan Handler
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(receive, send)
elif scope["type"] == "http":
await handle_http(scope, receive, send)
async def handle_lifespan(receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
try:
await startup()
await send({"type": "lifespan.startup.complete"})
except Exception as e:
await send({
"type": "lifespan.startup.failed",
"message": str(e),
})
return
elif event["type"] == "lifespan.shutdown":
try:
await shutdown()
await send({"type": "lifespan.shutdown.complete"})
except Exception as e:
await send({
"type": "lifespan.shutdown.failed",
"message": str(e),
})
return
async def startup():
print("Application starting: connecting to database, loading caches...")
# await db.connect()
# await cache.warmup()
async def shutdown():
print("Application shutting down: closing connections...")
# await db.disconnect()
# await cache.flush()
Sharing State Between Lifespan and Handlers
Here’s the practical problem: you initialize a database connection pool in startup(), but your HTTP handlers need to use it. How do you get it to them?
The common pattern: use a module-level state container.
# state.py
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class AppState:
db: Optional[Any] = None
cache: Optional[Any] = None
config: dict = field(default_factory=dict)
state = AppState()
# app.py
import asyncio
import json
from state import state
async def startup():
# In a real app, these would be actual async connections
state.db = await create_db_pool()
state.cache = await create_redis_client()
state.config = await load_config()
print(f"Started. DB: {state.db}, Cache: {state.cache}")
async def shutdown():
if state.db:
await state.db.close()
if state.cache:
await state.cache.close()
print("Clean shutdown complete.")
async def create_db_pool():
"""Simulate creating a database connection pool."""
await asyncio.sleep(0.1) # Simulate async connection
return {"pool": "connected", "size": 10}
async def create_redis_client():
await asyncio.sleep(0.05)
return {"redis": "connected"}
async def load_config():
return {"env": "production", "debug": False}
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(receive, send)
elif scope["type"] == "http":
await handle_http(scope, receive, send)
async def handle_lifespan(receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
await startup()
await send({"type": "lifespan.startup.complete"})
elif event["type"] == "lifespan.shutdown":
await shutdown()
await send({"type": "lifespan.shutdown.complete"})
return
async def handle_http(scope, receive, send):
# Now we can use state.db, state.cache, etc.
body = json.dumps({
"db": str(state.db),
"cache": str(state.cache),
"config": state.config,
}).encode()
await send({
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", b"application/json"),
(b"content-length", str(len(body)).encode()),
],
})
await send({
"type": "http.response.body",
"body": body,
"more_body": False,
})
Scope-Based State (The Starlette Pattern)
A cleaner pattern: store state in the ASGI scope itself. Starlette does this:
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(scope, receive, send)
elif scope["type"] in ("http", "websocket"):
# State from lifespan is available in scope["state"]
await handle_http(scope, receive, send)
async def handle_lifespan(scope, receive, send):
# Initialize a state dict in the scope
scope["state"] = {}
while True:
event = await receive()
if event["type"] == "lifespan.startup":
scope["state"]["db"] = await create_db_pool()
scope["state"]["started_at"] = time.time()
await send({"type": "lifespan.startup.complete"})
elif event["type"] == "lifespan.shutdown":
await scope["state"]["db"].close()
await send({"type": "lifespan.shutdown.complete"})
return
async def handle_http(scope, receive, send):
# Access state from scope
db = scope.get("state", {}).get("db")
# ...
Note: this works because Uvicorn passes the same scope object reference to both the lifespan call and each subsequent HTTP/WebSocket call. The state dict you attach in lifespan is available everywhere.
Background Tasks
Lifespan is also where you start and stop background tasks — things that run continuously alongside request handling:
import asyncio
background_tasks = set()
async def startup():
# Start a periodic cleanup task
task = asyncio.create_task(periodic_cleanup())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# Start a health check task
task = asyncio.create_task(report_health())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
async def shutdown():
# Cancel all background tasks
for task in list(background_tasks):
task.cancel()
# Wait for them to finish
if background_tasks:
await asyncio.gather(*background_tasks, return_exceptions=True)
async def periodic_cleanup():
while True:
try:
await asyncio.sleep(300) # Every 5 minutes
await cleanup_expired_sessions()
except asyncio.CancelledError:
break # Graceful shutdown
except Exception as e:
print(f"Cleanup error: {e}")
async def report_health():
while True:
try:
await asyncio.sleep(60) # Every minute
# Report to monitoring service
except asyncio.CancelledError:
break
except Exception as e:
print(f"Health report error: {e}")
async def cleanup_expired_sessions():
pass # Actual implementation would hit the database
The asyncio.CancelledError handling in the background tasks is important: when you call task.cancel() during shutdown, the task receives a CancelledError. If you don’t catch it, the task exits with an exception. If you do catch it and don’t re-raise, the task exits cleanly.
What Happens If Lifespan Fails
If your startup() raises an exception and you send lifespan.startup.failed:
await send({
"type": "lifespan.startup.failed",
"message": "Database connection refused",
})
Uvicorn will refuse to accept any connections and exit with an error. This is the right behavior — you don’t want to serve requests with a broken database connection.
Lifespan in Frameworks
Starlette (and FastAPI) have a lifespan parameter:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
db = await create_db_pool()
app.state.db = db
yield
# Shutdown
await db.close()
app = FastAPI(lifespan=lifespan)
This is the modern FastAPI pattern. The yield separates startup (before yield) from shutdown (after yield). Under the hood, Starlette wraps this context manager in the lifespan protocol we’ve been discussing.
The older pattern used @app.on_event("startup") and @app.on_event("shutdown") decorators. These are now deprecated in favor of the lifespan context manager approach, which is cleaner because startup and shutdown code live together.
Django has AppConfig.ready() for initialization, but it runs synchronously at import time and has no shutdown hook. For ASGI Django, you’d use Starlette’s lifespan middleware or a third-party library.
Testing Lifespan
When testing ASGI apps, you need to send the lifespan events to initialize the app properly:
import asyncio
import io
async def run_lifespan(app) -> asyncio.Event:
"""
Trigger the app's startup sequence.
Returns when startup is complete.
"""
scope = {"type": "lifespan", "asgi": {"version": "3.0"}}
events = asyncio.Queue()
startup_complete = asyncio.Event()
await events.put({"type": "lifespan.startup"})
async def receive():
return await events.get()
async def send(event):
if event["type"] == "lifespan.startup.complete":
startup_complete.set()
elif event["type"] == "lifespan.startup.failed":
raise RuntimeError(event.get("message", "Startup failed"))
asyncio.create_task(app(scope, receive, send))
await startup_complete.wait()
return events # Return the queue so caller can send shutdown
async def test_with_lifespan():
lifespan_queue = await run_lifespan(application)
# Run your tests here
# state is now initialized
# Clean shutdown
await lifespan_queue.put({"type": "lifespan.shutdown"})
In practice, test frameworks like httpx.AsyncClient with app= or Starlette’s TestClient handle lifespan events automatically. The patterns chapter covers testing in detail.
The Existential Dread Part
Here’s the thing nobody warns you about: in production, your app will sometimes receive SIGTERM while handling a request. The lifespan shutdown is triggered, your background tasks are cancelled, and then — what happens to the in-flight request?
Uvicorn handles this with a graceful timeout: it stops accepting new connections, waits for in-flight requests to complete (up to a configurable timeout), then triggers the lifespan shutdown.
If you’re doing something expensive in a request handler — a long database query, a slow external API call — you might hit the timeout. The request gets dropped, the client gets a connection error, and your cleanup runs anyway.
There’s no perfect solution here. The best you can do is:
- Set a reasonable graceful shutdown timeout (Uvicorn’s
--timeout-graceful-shutdown) - Make your handlers fast
- Use database connection pools with timeouts
- Accept that the occasional in-flight request will be dropped during deploys
Kubernetes rolling deployments, blue-green deploys, and load balancers with connection draining all help, but ultimately distributed systems are adversarial. Lifespan gives you a clean interface to handle it as well as possible.
WebSockets Over ASGI (Finally, a Reason to Care)
We’ve spent several chapters saying “WSGI can’t do WebSockets.” Now let’s actually use them.
WebSockets are a persistent, bidirectional communication channel between client and server. The browser opens a connection that stays open. Either side can send messages at any time. This enables chat applications, real-time dashboards, live collaboration, games — any scenario where you need low-latency communication without the overhead of polling.
ASGI handles WebSockets natively. Let’s see how.
The WebSocket Handshake
A WebSocket connection starts as an HTTP request with special headers:
GET /ws HTTP/1.1
Host: localhost:8000
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
The server responds with:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
After this exchange, the connection is no longer HTTP — it’s a WebSocket stream. The Sec-WebSocket-Accept value is a specific hash of the client’s key; browsers verify it to prevent cross-origin attacks.
In ASGI, the server handles this handshake. Your application just receives a websocket scope and sends a websocket.accept event.
The Simplest WebSocket App
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(receive, send)
elif scope["type"] == "http":
await handle_http(scope, receive, send)
elif scope["type"] == "websocket":
await handle_websocket(scope, receive, send)
async def handle_websocket(scope, receive, send):
"""Echo server: sends back whatever the client sends."""
# Wait for the connection event
event = await receive()
assert event["type"] == "websocket.connect"
# Accept the connection
await send({"type": "websocket.accept"})
# Echo loop
while True:
event = await receive()
if event["type"] == "websocket.receive":
# Echo the message back
if event.get("text"):
await send({
"type": "websocket.send",
"text": f"Echo: {event['text']}",
})
elif event.get("bytes"):
await send({
"type": "websocket.send",
"bytes": event["bytes"],
})
elif event["type"] == "websocket.disconnect":
# Client disconnected — exit the handler
break
Run it:
uvicorn ws_app:application
Test with a WebSocket client. Python’s websockets library works well:
# test_ws.py
import asyncio
import websockets
async def test():
async with websockets.connect("ws://localhost:8000/ws") as ws:
await ws.send("Hello, WebSocket!")
response = await ws.recv()
print(response) # Echo: Hello, WebSocket!
asyncio.run(test())
Or from the browser console:
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello from browser");
// Echo: Hello from browser
A Real Example: Chat Room
An echo server demonstrates the protocol. A chat room demonstrates why you’d actually use it.
import asyncio
import json
from typing import Dict, Set
# Connected clients: room_name → set of send callables
rooms: Dict[str, Set] = {}
async def handle_websocket(scope, receive, send):
"""Multi-room chat server."""
# Get room from query string
query = scope.get("query_string", b"").decode()
room = "general"
for param in query.split("&"):
if param.startswith("room="):
room = param[5:]
break
# Join the room
if room not in rooms:
rooms[room] = set()
rooms[room].add(send)
try:
# Accept the WebSocket connection
event = await receive()
if event["type"] != "websocket.connect":
return
await send({"type": "websocket.accept"})
# Announce arrival
await broadcast(room, {
"type": "system",
"message": f"A user joined #{room}",
}, exclude=send)
# Message loop
while True:
event = await receive()
if event["type"] == "websocket.receive":
text = event.get("text", "")
if not text:
continue
try:
data = json.loads(text)
except json.JSONDecodeError:
data = {"text": text}
# Broadcast to everyone in the room
await broadcast(room, {
"type": "message",
"text": data.get("text", text),
"room": room,
})
elif event["type"] == "websocket.disconnect":
break
finally:
# Remove from room on disconnect
rooms.get(room, set()).discard(send)
await broadcast(room, {
"type": "system",
"message": f"A user left #{room}",
})
async def broadcast(room: str, data: dict, exclude=None) -> None:
"""Send a message to all clients in a room."""
message = json.dumps(data)
dead_clients = set()
for client_send in list(rooms.get(room, set())):
if client_send is exclude:
continue
try:
await client_send({
"type": "websocket.send",
"text": message,
})
except Exception:
dead_clients.add(client_send)
# Clean up dead clients
rooms.get(room, set()).difference_update(dead_clients)
Test with multiple connections:
# multi_client_test.py
import asyncio
import json
import websockets
async def client(name: str, room: str = "general"):
async with websockets.connect(f"ws://localhost:8000/ws?room={room}") as ws:
print(f"{name} connected")
# Send a message
await ws.send(json.dumps({"text": f"Hello from {name}"}))
# Receive messages for 2 seconds
async def receive_loop():
while True:
msg = await ws.recv()
data = json.loads(msg)
print(f"{name} received: {data}")
try:
await asyncio.wait_for(receive_loop(), timeout=2.0)
except asyncio.TimeoutError:
pass
async def main():
# Connect three clients concurrently
await asyncio.gather(
client("Alice"),
client("Bob"),
client("Carol"),
)
asyncio.run(main())
Rejecting WebSocket Connections
Sometimes you need to reject a WebSocket connection — bad authentication, rate limit exceeded, room full:
async def handle_websocket(scope, receive, send):
# Wait for connect event
event = await receive()
assert event["type"] == "websocket.connect"
# Check authentication
token = get_token_from_scope(scope)
if not await is_valid_token(token):
# Reject the connection with a close code
await send({
"type": "websocket.close",
"code": 4001, # Application-defined code (4000-4999 are custom)
})
return
# Accept and proceed
await send({"type": "websocket.accept"})
# ...
def get_token_from_scope(scope) -> str:
"""Extract Bearer token from WebSocket upgrade headers."""
for name, value in scope.get("headers", []):
if name == b"authorization":
auth = value.decode("latin-1")
if auth.startswith("Bearer "):
return auth[7:]
# Also check query string
query = scope.get("query_string", b"").decode()
for param in query.split("&"):
if param.startswith("token="):
return param[6:]
return ""
async def is_valid_token(token: str) -> bool:
# Check against database, JWT validation, etc.
return token == "valid-token" # Simplified
WebSocket close codes follow the RFC 6455 spec:
1000— Normal closure1001— Going away (server shutdown)1002— Protocol error1003— Unsupported data type4000-4999— Application-defined (use these for your own status codes)
Sending Binary Data
WebSockets support both text frames (UTF-8 encoded) and binary frames. Use binary for protocol buffers, binary file transfers, or any non-text data:
async def binary_echo(scope, receive, send):
event = await receive()
assert event["type"] == "websocket.connect"
await send({"type": "websocket.accept"})
while True:
event = await receive()
if event["type"] == "websocket.receive":
if event.get("bytes") is not None:
# Echo binary data back
await send({
"type": "websocket.send",
"bytes": event["bytes"],
})
elif event.get("text") is not None:
# Convert text to binary (example)
await send({
"type": "websocket.send",
"bytes": event["text"].encode("utf-8"),
})
elif event["type"] == "websocket.disconnect":
break
Handling Both HTTP and WebSocket
Real applications serve regular HTTP endpoints and WebSocket endpoints. Wire them together:
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(receive, send)
elif scope["type"] == "http":
await handle_http(scope, receive, send)
elif scope["type"] == "websocket":
path = scope["path"]
if path == "/ws/chat":
await handle_websocket(scope, receive, send)
elif path == "/ws/echo":
await echo_websocket(scope, receive, send)
else:
# Reject unknown WebSocket paths
event = await receive()
if event["type"] == "websocket.connect":
await send({"type": "websocket.close", "code": 4004})
else:
pass # Unknown scope type, ignore
async def handle_http(scope, receive, send):
await read_body(receive)
path = scope["path"]
if path == "/":
# Serve a simple HTML page with a WebSocket client
html = """
<!DOCTYPE html>
<html>
<head><title>WebSocket Chat</title></head>
<body>
<input id="msg" placeholder="Message..." />
<button onclick="send()">Send</button>
<ul id="log"></ul>
<script>
const ws = new WebSocket("ws://" + location.host + "/ws/chat");
ws.onmessage = e => {
const li = document.createElement("li");
li.textContent = e.data;
document.getElementById("log").appendChild(li);
};
function send() {
const input = document.getElementById("msg");
ws.send(JSON.stringify({text: input.value}));
input.value = "";
}
</script>
</body>
</html>
""".encode("utf-8")
await send({
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", b"text/html; charset=utf-8"),
(b"content-length", str(len(html)).encode()),
],
})
await send({
"type": "http.response.body",
"body": html,
"more_body": False,
})
else:
body = b"Not Found"
await send({
"type": "http.response.start",
"status": 404,
"headers": [(b"content-type", b"text/plain"),
(b"content-length", str(len(body)).encode())],
})
await send({"type": "http.response.body", "body": body, "more_body": False})
Save, run with uvicorn, open http://localhost:8000 in a browser. Open multiple tabs — they’re all in the same chat room.
The Concurrency Model
When two WebSocket clients are connected, their handler coroutines run concurrently on the event loop:
event loop:
coroutine A (client 1): waiting at "await receive()"
coroutine B (client 2): waiting at "await receive()"
Client 1 sends a message:
→ coroutine A wakes up
→ calls broadcast()
→ await client_2_send(...) # sends to client 2
→ coroutine A goes back to "await receive()"
Client 2 sends a message:
→ coroutine B wakes up
→ calls broadcast()
→ await client_1_send(...) # sends to client 1
→ coroutine B goes back to "await receive()"
Each handler runs independently, cooperatively yielding at await points. This is why our broadcast works without locks or synchronization: the event loop is single-threaded, so only one coroutine runs at a time. There’s no race condition.
This breaks down if you have CPU-bound work in a handler (use run_in_executor) or if you use actual threads (use asyncio.Lock). But for pure async I/O, the cooperative model is both safe and efficient.
What Frameworks Add
Starlette provides a WebSocket class that wraps the ASGI scope and adds convenience methods:
from starlette.websockets import WebSocket
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
text = await websocket.receive_text() # or receive_bytes(), receive_json()
await websocket.send_text(f"Echo: {text}")
FastAPI builds on Starlette’s WebSocket with dependency injection:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await websocket.send_text("Connected!")
# ...
Under the hood, websocket.accept() sends {"type": "websocket.accept"}, websocket.receive_text() awaits receive() and extracts the text field, and websocket.send_text() sends {"type": "websocket.send", "text": ...}.
The raw events are still there. The framework just provides a nicer interface to them.
ASGI Middleware Deep Dive
ASGI middleware follows the same pattern as WSGI middleware: a callable that takes an app and returns an app. The difference is that everything is async, and the interface is (scope, receive, send) instead of (environ, start_response).
The async nature introduces some subtleties worth understanding.
The Simplest ASGI Middleware
class DoNothingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
await self.app(scope, receive, send)
Or as a function:
def do_nothing_middleware(app):
async def wrapper(scope, receive, send):
await self.app(scope, receive, send)
return wrapper
Both are equivalent. Classes are slightly more common in ASGI middleware because they can hold configuration cleanly.
Intercepting Requests and Responses
In WSGI, you intercept responses by wrapping start_response. In ASGI, you intercept them by wrapping send:
class TimingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
import time
started = time.monotonic()
status_holder = []
async def send_with_timing(event):
if event["type"] == "http.response.start":
status_holder.append(event["status"])
elif event["type"] == "http.response.body":
if not event.get("more_body", False):
elapsed = (time.monotonic() - started) * 1000
method = scope.get("method", "?")
path = scope.get("path", "/")
status = status_holder[0] if status_holder else "?"
print(f"{method} {path} → {status} ({elapsed:.1f}ms)")
await send(event)
await self.app(scope, receive, send_with_timing)
The send_with_timing coroutine wraps send just like we wrapped start_response in WSGI. It intercepts the http.response.start event to capture the status code and the final http.response.body event to measure total time, then passes everything through.
Modifying Requests
Intercept receive to modify incoming data:
class RequestIDMiddleware:
"""Add a unique request ID to every request."""
def __init__(self, app, header: str = "X-Request-ID"):
self.app = app
self.header = header.lower().encode("latin-1")
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
import uuid
# Check for existing ID, generate one if missing
existing_id = None
for name, value in scope.get("headers", []):
if name == self.header:
existing_id = value.decode("latin-1")
break
request_id = existing_id or str(uuid.uuid4())
# Inject into scope
scope = dict(scope)
headers = list(scope.get("headers", []))
# Update or add the header
new_headers = [(n, v) for n, v in headers if n != self.header]
new_headers.append((self.header, request_id.encode("latin-1")))
scope["headers"] = new_headers
scope["request_id"] = request_id
# Add request ID to response headers
async def send_with_id(event):
if event["type"] == "http.response.start":
event = dict(event)
event["headers"] = list(event.get("headers", [])) + [
(self.header, request_id.encode("latin-1"))
]
await send(event)
await self.app(scope, receive, send_with_id)
Authentication Middleware
A complete authentication middleware that short-circuits the request if the token is invalid:
import json
from typing import Optional
class BearerAuthMiddleware:
"""
Validates Bearer tokens.
Injects user information into scope["user"] if valid.
Returns 401 if token is missing or invalid.
Skips authentication for paths in exclude_paths.
"""
def __init__(
self,
app,
verify_token, # async callable: token → user dict or None
exclude_paths: list = None,
):
self.app = app
self.verify_token = verify_token
self.exclude_paths = set(exclude_paths or [])
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope.get("path", "/")
# Skip auth for excluded paths
if path in self.exclude_paths:
await self.app(scope, receive, send)
return
# Extract Bearer token
token = self._extract_token(scope)
if token is None:
await self._send_401(send, "Missing Authorization header")
return
# Verify the token
user = await self.verify_token(token)
if user is None:
await self._send_401(send, "Invalid or expired token")
return
# Inject user into scope
scope = dict(scope)
scope["user"] = user
await self.app(scope, receive, send)
def _extract_token(self, scope) -> Optional[str]:
for name, value in scope.get("headers", []):
if name == b"authorization":
auth = value.decode("latin-1")
if auth.startswith("Bearer "):
return auth[7:]
return None
async def _send_401(self, send, message: str) -> None:
body = json.dumps({"error": message}).encode("utf-8")
await send({
"type": "http.response.start",
"status": 401,
"headers": [
(b"content-type", b"application/json"),
(b"content-length", str(len(body)).encode()),
(b"www-authenticate", b'Bearer realm="API"'),
],
})
await send({
"type": "http.response.body",
"body": body,
"more_body": False,
})
Usage:
async def verify_token(token: str):
# Check your database or JWT
if token == "valid-token":
return {"id": 1, "username": "alice", "role": "admin"}
return None
app = BearerAuthMiddleware(
my_app,
verify_token=verify_token,
exclude_paths=["/health", "/login"],
)
The GZIP Compression Middleware
A real-world example with non-trivial response manipulation:
import gzip
import io
class GZipMiddleware:
"""
Compress responses with gzip when:
- Client sends Accept-Encoding: gzip
- Response content type is compressible (text, JSON, etc.)
- Response body is above minimum size threshold
"""
COMPRESSIBLE_TYPES = {
"text/html", "text/plain", "text/css", "text/javascript",
"application/json", "application/javascript",
"application/xml", "image/svg+xml",
}
def __init__(self, app, minimum_size: int = 500, compresslevel: int = 6):
self.app = app
self.minimum_size = minimum_size
self.compresslevel = compresslevel
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Check if client accepts gzip
accepts_gzip = False
for name, value in scope.get("headers", []):
if name == b"accept-encoding":
accepts_gzip = b"gzip" in value
break
if not accepts_gzip:
await self.app(scope, receive, send)
return
# We need to collect the full response to compress it
response_started = []
body_chunks = []
async def collecting_send(event):
if event["type"] == "http.response.start":
response_started.append(event)
elif event["type"] == "http.response.body":
body_chunks.append(event.get("body", b""))
if not event.get("more_body", False):
# All chunks collected — decide whether to compress
full_body = b"".join(body_chunks)
start_event = response_started[0]
content_type = ""
for name, value in start_event.get("headers", []):
if name == b"content-type":
content_type = value.decode("latin-1").split(";")[0].strip()
break
should_compress = (
len(full_body) >= self.minimum_size
and content_type in self.COMPRESSIBLE_TYPES
)
if should_compress:
compressed = gzip.compress(full_body,
compresslevel=self.compresslevel)
# Rebuild headers with encoding and new length
headers = [
(n, v) for n, v in start_event.get("headers", [])
if n not in (b"content-length", b"content-encoding")
]
headers.append((b"content-encoding", b"gzip"))
headers.append((b"content-length",
str(len(compressed)).encode()))
await send(dict(start_event, headers=headers))
await send({
"type": "http.response.body",
"body": compressed,
"more_body": False,
})
else:
# Send as-is
await send(start_event)
await send({
"type": "http.response.body",
"body": full_body,
"more_body": False,
})
await self.app(scope, receive, collecting_send)
Note the limitation: we’re collecting the entire response body before deciding whether to compress. For large streaming responses, this defeats the purpose of streaming. A streaming-compatible compressor would use more_body=True to send compressed chunks incrementally — at the cost of significantly more complexity.
Building a Middleware Stack
from functools import reduce
from typing import Callable, List
def build_middleware_stack(app: Callable, middleware: List) -> Callable:
"""
Build an ASGI middleware stack.
middleware = [A, B, C] → A(B(C(app)))
Request order: A → B → C → app
"""
for mw in reversed(middleware):
if callable(mw) and not isinstance(mw, type):
# It's a factory function, call it
app = mw(app)
elif isinstance(mw, type):
# It's a class, instantiate it
app = mw(app)
else:
# It's already an instance with __call__
app = mw
return app
# Usage
async def verify_token(token: str):
return {"id": 1} if token == "secret" else None
application = build_middleware_stack(my_app, [
TimingMiddleware,
RequestIDMiddleware,
GZipMiddleware,
lambda app: BearerAuthMiddleware(app, verify_token=verify_token,
exclude_paths=["/health"]),
])
Middleware That Handles Lifespan
If your middleware needs its own startup/shutdown (e.g., opening a connection), handle the lifespan scope:
class DatabaseMiddleware:
"""
Opens a database connection pool at startup,
injects it into each request's scope.
"""
def __init__(self, app, database_url: str):
self.app = app
self.database_url = database_url
self.pool = None
async def __call__(self, scope, receive, send):
if scope["type"] == "lifespan":
await self._handle_lifespan(scope, receive, send)
return
# Inject pool into scope
scope = dict(scope)
scope["db"] = self.pool
await self.app(scope, receive, send)
async def _handle_lifespan(self, scope, receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
self.pool = await self._connect()
await self.app(scope, receive, send) # Forward to inner app
# Note: inner app handles the rest of lifespan
return
elif event["type"] == "lifespan.shutdown":
if self.pool:
await self.pool.close()
await send({"type": "lifespan.shutdown.complete"})
return
async def _connect(self):
# asyncpg.create_pool(self.database_url), etc.
print(f"Connected to {self.database_url}")
return {"url": self.database_url, "status": "connected"}
This pattern — middleware that intercepts lifespan and injects resources into HTTP scopes — is how Starlette’s SessionMiddleware, database integrations, and other resource-managing middleware work.
Testing Middleware
Testing ASGI middleware directly, without a framework:
import asyncio
import io
class MockASGI:
"""A fake ASGI server for testing middleware."""
def __init__(self):
self.received_events = []
async def make_request(
self,
app,
method: str = "GET",
path: str = "/",
headers: list = None,
body: bytes = b"",
) -> tuple:
scope = {
"type": "http",
"method": method,
"path": path,
"query_string": b"",
"headers": headers or [],
"server": ("testserver", 80),
}
request_events = [
{"type": "http.request", "body": body, "more_body": False}
]
event_index = [0]
async def receive():
idx = event_index[0]
event_index[0] += 1
if idx < len(request_events):
return request_events[idx]
return {"type": "http.disconnect"}
response_events = []
async def send(event):
response_events.append(event)
await app(scope, receive, send)
return scope, response_events
async def test_timing_middleware():
async def simple_app(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [(b"content-type", b"text/plain")],
})
await send({
"type": "http.response.body",
"body": b"OK",
"more_body": False,
})
app = TimingMiddleware(simple_app)
mock = MockASGI()
scope, events = await mock.make_request(app, "GET", "/test")
assert events[0]["type"] == "http.response.start"
assert events[0]["status"] == 200
assert events[1]["body"] == b"OK"
print("TimingMiddleware test passed")
asyncio.run(test_timing_middleware())
The Key Difference from WSGI Middleware
In WSGI middleware, you intercept start_response (a synchronous callable) to capture the status and headers before forwarding them. In ASGI middleware, you intercept the async send callable to capture http.response.start events.
The conceptual model is identical — wrap the callable, inspect and possibly modify events passing through — but the async nature means everything must be awaited. This is also why ASGI middleware can do things WSGI middleware can’t: it can await during send, which means it can do async work (database calls, cache writes) as part of processing the response.
That’s either powerful or terrifying, depending on how carefully your middleware is written.
Testing WSGI and ASGI Apps Without a Framework
Your application is a callable. Test it like one.
This is the payoff for understanding the interface directly: testing becomes a matter of calling your app with the right arguments and inspecting the result. No special test client required — though they’re convenient, and we’ll look at those too.
Testing WSGI Applications
A WSGI application takes environ and start_response. To test it, provide both:
import io
import json
from typing import Any, Dict, List, Optional, Tuple
def make_environ(
method: str = "GET",
path: str = "/",
query_string: str = "",
body: bytes = b"",
content_type: str = "",
headers: Optional[Dict[str, str]] = None,
environ_overrides: Optional[Dict] = None,
) -> dict:
"""Build a WSGI environ dict for testing."""
environ = {
"REQUEST_METHOD": method.upper(),
"PATH_INFO": path,
"QUERY_STRING": query_string,
"CONTENT_TYPE": content_type,
"CONTENT_LENGTH": str(len(body)) if body else "",
"SERVER_NAME": "testserver",
"SERVER_PORT": "80",
"HTTP_HOST": "testserver",
"wsgi.input": io.BytesIO(body),
"wsgi.errors": io.StringIO(),
"wsgi.url_scheme": "http",
"wsgi.version": (1, 0),
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"GATEWAY_INTERFACE": "CGI/1.1",
"SERVER_PROTOCOL": "HTTP/1.1",
}
# Add custom headers
if headers:
for name, value in headers.items():
key = "HTTP_" + name.upper().replace("-", "_")
environ[key] = value
if environ_overrides:
environ.update(environ_overrides)
return environ
class WSGITestResponse:
"""Holds the result of calling a WSGI application."""
def __init__(self, status: str, headers: List[Tuple[str, str]], body: bytes):
self.status = status
self.status_code = int(status.split(" ")[0])
self.headers = dict(headers)
self.body = body
@property
def text(self) -> str:
return self.body.decode("utf-8")
def json(self) -> Any:
return json.loads(self.body)
def __repr__(self) -> str:
return f"<WSGITestResponse {self.status}>"
def call_wsgi(app, environ: dict) -> WSGITestResponse:
"""Call a WSGI app and return a response object."""
response_parts = []
def start_response(status, headers, exc_info=None):
if exc_info:
raise exc_info[1].with_traceback(exc_info[2])
response_parts.append((status, headers))
result = app(environ, start_response)
try:
body = b"".join(result)
finally:
if hasattr(result, "close"):
result.close()
status, headers = response_parts[0]
return WSGITestResponse(status, headers, body)
class WSGITestClient:
"""A test client for WSGI applications."""
def __init__(self, app):
self.app = app
def get(self, path: str, **kwargs) -> WSGITestResponse:
return self.request("GET", path, **kwargs)
def post(self, path: str, **kwargs) -> WSGITestResponse:
return self.request("POST", path, **kwargs)
def put(self, path: str, **kwargs) -> WSGITestResponse:
return self.request("PUT", path, **kwargs)
def delete(self, path: str, **kwargs) -> WSGITestResponse:
return self.request("DELETE", path, **kwargs)
def request(
self,
method: str,
path: str,
body: bytes = b"",
json: Any = None,
headers: Optional[Dict[str, str]] = None,
query_string: str = "",
) -> WSGITestResponse:
content_type = ""
if json is not None:
import json as json_module
body = json_module.dumps(json).encode("utf-8")
content_type = "application/json"
environ = make_environ(
method=method,
path=path,
query_string=query_string,
body=body,
content_type=content_type,
headers=headers,
)
return call_wsgi(self.app, environ)
Writing WSGI Tests
# test_wsgi_tasks.py
from tasks_app import application # The tasks app from chapter 5
client = WSGITestClient(application)
def test_empty_task_list():
response = client.get("/tasks")
assert response.status_code == 200
assert response.json() == []
def test_create_task():
response = client.post("/tasks", json={"title": "Write tests"})
assert response.status_code == 201
data = response.json()
assert data["title"] == "Write tests"
assert data["done"] is False
assert "id" in data
return data["id"]
def test_get_task():
# Create a task first
task_id = test_create_task()
response = client.get(f"/tasks/{task_id}")
assert response.status_code == 200
assert response.json()["id"] == task_id
def test_task_not_found():
response = client.get("/tasks/does-not-exist")
assert response.status_code == 404
def test_delete_task():
task_id = test_create_task()
response = client.delete(f"/tasks/{task_id}")
assert response.status_code == 200
response = client.get(f"/tasks/{task_id}")
assert response.status_code == 404
def test_missing_title():
response = client.post("/tasks", json={"description": "no title here"})
assert response.status_code == 400
assert "title" in response.json()["error"]
def test_wrong_method():
response = client.request("PATCH", "/tasks")
assert response.status_code == 405
if __name__ == "__main__":
test_empty_task_list()
test_create_task()
test_get_task()
test_task_not_found()
test_delete_task()
test_missing_title()
test_wrong_method()
print("All WSGI tests passed.")
Run with pytest (pip install pytest) or directly:
pytest test_wsgi_tasks.py -v
# or
python test_wsgi_tasks.py
Testing ASGI Applications
ASGI apps are async, so tests need to be async too. Python’s asyncio makes this straightforward:
import asyncio
import io
import json
from typing import Any, Dict, List, Optional
def make_scope(
method: str = "GET",
path: str = "/",
query_string: bytes = b"",
headers: Optional[List] = None,
scope_type: str = "http",
) -> dict:
"""Build an ASGI HTTP scope dict for testing."""
return {
"type": scope_type,
"asgi": {"version": "3.0"},
"http_version": "1.1",
"method": method.upper(),
"path": path,
"raw_path": path.encode("latin-1"),
"query_string": query_string,
"root_path": "",
"scheme": "http",
"headers": headers or [],
"server": ("testserver", 80),
"client": ("127.0.0.1", 12345),
}
class ASGITestResponse:
"""Holds the result of calling an ASGI application."""
def __init__(self, status: int, headers: List, body: bytes):
self.status_code = status
self._headers = headers
self.headers = {
k.decode("latin-1"): v.decode("latin-1")
for k, v in headers
}
self.body = body
@property
def text(self) -> str:
return self.body.decode("utf-8")
def json(self) -> Any:
return json.loads(self.body)
def __repr__(self) -> str:
return f"<ASGITestResponse {self.status_code}>"
async def call_asgi(
app,
scope: dict,
body: bytes = b"",
) -> ASGITestResponse:
"""Call an ASGI app with an HTTP scope and return a response."""
request_events = [
{"type": "http.request", "body": body, "more_body": False}
]
event_index = [0]
async def receive():
idx = event_index[0]
event_index[0] += 1
if idx < len(request_events):
return request_events[idx]
return {"type": "http.disconnect"}
response_events = []
async def send(event):
response_events.append(event)
await app(scope, receive, send)
start = next(e for e in response_events if e["type"] == "http.response.start")
body_chunks = [
e.get("body", b"")
for e in response_events
if e["type"] == "http.response.body"
]
return ASGITestResponse(
status=start["status"],
headers=start.get("headers", []),
body=b"".join(body_chunks),
)
class ASGITestClient:
"""A test client for ASGI applications."""
def __init__(self, app):
self.app = app
self._started = False
async def _ensure_started(self):
"""Send lifespan startup if not already done."""
if self._started:
return
self._started = True
scope = {"type": "lifespan", "asgi": {"version": "3.0"}}
events = asyncio.Queue()
startup_done = asyncio.Event()
await events.put({"type": "lifespan.startup"})
async def receive():
return await events.get()
async def send(event):
if event["type"] == "lifespan.startup.complete":
startup_done.set()
asyncio.create_task(self.app(scope, receive, send))
try:
await asyncio.wait_for(startup_done.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass # App may not handle lifespan
async def get(self, path: str, **kwargs) -> ASGITestResponse:
return await self.request("GET", path, **kwargs)
async def post(self, path: str, **kwargs) -> ASGITestResponse:
return await self.request("POST", path, **kwargs)
async def delete(self, path: str, **kwargs) -> ASGITestResponse:
return await self.request("DELETE", path, **kwargs)
async def patch(self, path: str, **kwargs) -> ASGITestResponse:
return await self.request("PATCH", path, **kwargs)
async def request(
self,
method: str,
path: str,
body: bytes = b"",
json: Any = None,
headers: Optional[Dict[str, str]] = None,
query_string: bytes = b"",
) -> ASGITestResponse:
await self._ensure_started()
content_type = ""
if json is not None:
import json as json_module
body = json_module.dumps(json).encode("utf-8")
content_type = "application/json"
raw_headers = []
if content_type:
raw_headers.append((b"content-type", content_type.encode()))
if body:
raw_headers.append((b"content-length", str(len(body)).encode()))
if headers:
for name, value in headers.items():
raw_headers.append((
name.lower().encode("latin-1"),
value.encode("latin-1"),
))
scope = make_scope(
method=method,
path=path,
query_string=query_string,
headers=raw_headers,
)
return await call_asgi(self.app, scope, body)
Writing ASGI Tests
# test_asgi_tasks.py
import asyncio
from asgi_tasks import application # From ASGI chapter
client = ASGITestClient(application)
async def test_list_tasks_empty():
response = await client.get("/tasks")
assert response.status_code == 200
assert response.json() == []
async def test_create_and_get_task():
# Create
response = await client.post("/tasks", json={"title": "Async task"})
assert response.status_code == 201
task = response.json()
assert task["title"] == "Async task"
task_id = task["id"]
# Get
response = await client.get(f"/tasks/{task_id}")
assert response.status_code == 200
assert response.json()["id"] == task_id
async def test_update_task():
# Create
response = await client.post("/tasks", json={"title": "To update"})
task_id = response.json()["id"]
# Update
response = await client.patch(f"/tasks/{task_id}", json={"done": True})
assert response.status_code == 200
assert response.json()["done"] is True
async def test_content_type_required():
response = await client.request(
"POST", "/tasks",
body=b'{"title": "oops"}',
# No content-type header
)
assert response.status_code == 415
async def run_all_tests():
await test_list_tasks_empty()
print("✓ list tasks empty")
await test_create_and_get_task()
print("✓ create and get task")
await test_update_task()
print("✓ update task")
await test_content_type_required()
print("✓ content type required")
print("All ASGI tests passed.")
if __name__ == "__main__":
asyncio.run(run_all_tests())
Using pytest-asyncio
For proper async test suites, use pytest-asyncio:
pip install pytest pytest-asyncio
# conftest.py
import pytest
from asgi_tasks import application
from test_helpers import ASGITestClient
@pytest.fixture
def client():
return ASGITestClient(application)
# test_asgi_tasks.py
import pytest
@pytest.mark.asyncio
async def test_create_task(client):
response = await client.post("/tasks", json={"title": "pytest task"})
assert response.status_code == 201
assert response.json()["title"] == "pytest task"
@pytest.mark.asyncio
async def test_delete_task(client):
create_response = await client.post("/tasks", json={"title": "to delete"})
task_id = create_response.json()["id"]
delete_response = await client.delete(f"/tasks/{task_id}")
assert delete_response.status_code == 200
get_response = await client.get(f"/tasks/{task_id}")
assert get_response.status_code == 404
pytest test_asgi_tasks.py -v
Testing WebSocket Handlers
WebSocket testing requires simulating the connect/message/disconnect lifecycle:
async def test_websocket_echo():
from asgi_websockets import application # WebSocket echo app
scope = {
"type": "websocket",
"asgi": {"version": "3.0"},
"path": "/ws",
"query_string": b"",
"headers": [],
"server": ("testserver", 8000),
"client": ("127.0.0.1", 9999),
"subprotocols": [],
}
# Queue of events the app will receive
incoming = asyncio.Queue()
outgoing = []
async def receive():
return await incoming.get()
async def send(event):
outgoing.append(event)
# Start the handler
handler_task = asyncio.create_task(application(scope, receive, send))
# Simulate WebSocket connect
await incoming.put({"type": "websocket.connect"})
await asyncio.sleep(0) # Let the handler process it
# Check accept was sent
assert outgoing[-1]["type"] == "websocket.accept"
# Send a message
await incoming.put({"type": "websocket.receive", "text": "hello"})
await asyncio.sleep(0)
# Check echo
echo_event = outgoing[-1]
assert echo_event["type"] == "websocket.send"
assert "hello" in echo_event.get("text", "")
# Disconnect
await incoming.put({"type": "websocket.disconnect", "code": 1000})
await handler_task
asyncio.run(test_websocket_echo())
Testing Middleware
Test middleware by composing it with a simple app:
async def test_timing_middleware():
import time
timing_logs = []
class CapturingTimingMiddleware(TimingMiddleware):
async def __call__(self, scope, receive, send):
# Override to capture instead of print
started = time.monotonic()
status_holder = []
async def capturing_send(event):
if event["type"] == "http.response.start":
status_holder.append(event["status"])
elif event["type"] == "http.response.body":
if not event.get("more_body", False):
elapsed = (time.monotonic() - started) * 1000
timing_logs.append({
"path": scope.get("path"),
"status": status_holder[0] if status_holder else None,
"elapsed_ms": elapsed,
})
await send(event)
await self.app(scope, receive, capturing_send)
async def simple_app(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [(b"content-type", b"text/plain")],
})
await send({
"type": "http.response.body",
"body": b"OK",
"more_body": False,
})
app = CapturingTimingMiddleware(simple_app)
test_client = ASGITestClient(app)
await test_client.get("/test-path")
assert len(timing_logs) == 1
assert timing_logs[0]["path"] == "/test-path"
assert timing_logs[0]["status"] == 200
assert timing_logs[0]["elapsed_ms"] >= 0
print("Timing middleware test passed")
asyncio.run(test_timing_middleware())
The httpx Transport Approach
The httpx library (an async HTTP client) has a built-in ASGI transport that lets you test ASGI apps with a full HTTP client interface:
pip install httpx
import httpx
async def test_with_httpx():
from asgi_tasks import application
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=application),
base_url="http://testserver",
) as client:
response = await client.post(
"/tasks",
json={"title": "httpx task"},
)
assert response.status_code == 201
task = response.json()
assert task["title"] == "httpx task"
# httpx handles cookies, redirects, etc. automatically
response = await client.get(f"/tasks/{task['id']}")
assert response.status_code == 200
asyncio.run(test_with_httpx())
httpx.ASGITransport calls your ASGI app directly, without a network socket. It’s the same mechanism we built manually — it constructs scopes, calls receive with request body events, and collects the response from send events — but packaged as a transport that the full httpx client can use.
This gives you all of httpx’s conveniences (cookie handling, redirects, JSON serialization) while testing without a server.
The Principle
Testing WSGI/ASGI apps without a framework is fast (no server startup), correct (you’re calling the exact same code path as production), and instructive (you understand exactly what’s happening).
The test client libraries — pytest-django, Starlette’s TestClient, FastAPI’s TestClient, httpx — are all doing exactly what we’ve done here: constructing the right environ/scope, calling your app, and wrapping the result.
When a test fails mysteriously, knowing that your test client is just calling app(scope, receive, send) gives you a concrete place to start debugging.
Roll Your Own Mini-Framework (For Fun and Understanding)
We have all the pieces. We’ve built a WSGI server, a router, middleware, and request/response objects. We’ve done the same for ASGI. Now let’s assemble them into something coherent: a small, complete framework that you’d actually consider using for a personal project.
This chapter is about synthesis. The goal isn’t to compete with Flask or Starlette — it’s to make the jump from “I’ve built components” to “I understand how a framework is structured.”
What We’re Building
A minimal ASGI framework called bare (fitting) with:
- Route registration via decorators
- Request and Response classes
- Middleware composition
- Lifespan event handling
- WebSocket support
- Zero dependencies beyond Python’s standard library (plus
uvicornto run it)
The finished product will be small enough to fit in a single file and correct enough to run real applications.
The Core Application Class
# bare.py
import asyncio
import json
import re
import urllib.parse
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple
# ── Types ─────────────────────────────────────────────────────────────────────
Headers = List[Tuple[bytes, bytes]]
ASGIApp = Callable # (scope, receive, send) -> None
# ── Request ───────────────────────────────────────────────────────────────────
class Request:
def __init__(self, scope: dict, receive: Callable):
self._scope = scope
self._receive = receive
self._body: Optional[bytes] = None
@property
def method(self) -> str:
return self._scope["method"]
@property
def path(self) -> str:
return self._scope["path"]
@property
def query_string(self) -> str:
return self._scope.get("query_string", b"").decode("latin-1")
@property
def headers(self) -> Dict[str, str]:
return {
k.decode("latin-1"): v.decode("latin-1")
for k, v in self._scope.get("headers", [])
}
def header(self, name: str, default: str = "") -> str:
return self.headers.get(name.lower(), default)
@property
def content_type(self) -> str:
return self.header("content-type")
@property
def path_params(self) -> Dict[str, str]:
return self._scope.get("path_params", {})
@property
def query_params(self) -> Dict[str, List[str]]:
return urllib.parse.parse_qs(self.query_string, keep_blank_values=True)
def query(self, name: str, default: str = "") -> str:
values = self.query_params.get(name, [])
return values[0] if values else default
async def body(self) -> bytes:
if self._body is None:
chunks = []
while True:
event = await self._receive()
if event["type"] == "http.request":
chunks.append(event.get("body", b""))
if not event.get("more_body", False):
break
elif event["type"] == "http.disconnect":
break
self._body = b"".join(chunks)
return self._body
async def text(self) -> str:
return (await self.body()).decode("utf-8")
async def json(self) -> Any:
return json.loads(await self.body())
@property
def app(self) -> "Bare":
return self._scope["app"]
def __repr__(self) -> str:
return f"<Request {self.method} {self.path}>"
# ── Response ──────────────────────────────────────────────────────────────────
class Response:
def __init__(
self,
body: Any = None,
status: int = 200,
headers: Optional[Dict[str, str]] = None,
content_type: str = "text/plain; charset=utf-8",
):
self.status = status
self._headers = {"content-type": content_type}
if headers:
self._headers.update({k.lower(): v for k, v in headers.items()})
if body is None:
self._body = b""
elif isinstance(body, bytes):
self._body = body
elif isinstance(body, str):
self._body = body.encode("utf-8")
else:
self._body = str(body).encode("utf-8")
def set_header(self, name: str, value: str) -> "Response":
self._headers[name.lower()] = value
return self
def set_cookie(self, name: str, value: str, **attrs) -> "Response":
cookie = f"{name}={value}"
for attr, attr_val in attrs.items():
cookie += f"; {attr.replace('_', '-')}={attr_val}"
self._headers.setdefault("set-cookie", cookie)
return self
async def send(self, send: Callable) -> None:
headers = list(self._headers.items())
headers.append(("content-length", str(len(self._body))))
raw_headers = [
(k.encode("latin-1"), v.encode("latin-1"))
for k, v in headers
]
await send({
"type": "http.response.start",
"status": self.status,
"headers": raw_headers,
})
await send({
"type": "http.response.body",
"body": self._body,
"more_body": False,
})
class JSONResponse(Response):
def __init__(self, data: Any, status: int = 200, **kwargs):
super().__init__(
body=json.dumps(data, default=str),
status=status,
content_type="application/json",
**kwargs,
)
class HTMLResponse(Response):
def __init__(self, html: str, status: int = 200, **kwargs):
super().__init__(body=html, status=status,
content_type="text/html; charset=utf-8", **kwargs)
class RedirectResponse(Response):
def __init__(self, location: str, status: int = 302):
super().__init__(status=status, headers={"location": location})
# ── Routing ───────────────────────────────────────────────────────────────────
@dataclass
class Route:
method: str
pattern: re.Pattern
handler: Callable
param_names: List[str]
def compile_route(path: str) -> Tuple[re.Pattern, List[str]]:
"""Convert '/users/{id:int}' to a compiled regex and param names."""
converters = {"str": r"[^/]+", "int": r"[0-9]+", "slug": r"[a-zA-Z0-9-]+"}
param_names = []
def replace(m):
name, _, conv = m.group(1).partition(":")
param_names.append(name)
return f"(?P<{name}>{converters.get(conv or 'str', converters['str'])})"
regex = "^" + re.sub(r"\{([^}]+)\}", replace, path) + "$"
return re.compile(regex), param_names
# ── WebSocket ─────────────────────────────────────────────────────────────────
class WebSocket:
def __init__(self, scope: dict, receive: Callable, send: Callable):
self._scope = scope
self._receive = receive
self._send = send
self.path_params: Dict[str, str] = scope.get("path_params", {})
async def accept(self, subprotocol: str = None) -> None:
event = await self._receive()
assert event["type"] == "websocket.connect"
await self._send({
"type": "websocket.accept",
"subprotocol": subprotocol,
})
async def receive_text(self) -> Optional[str]:
event = await self._receive()
if event["type"] == "websocket.receive":
return event.get("text")
return None # disconnect
async def receive_bytes(self) -> Optional[bytes]:
event = await self._receive()
if event["type"] == "websocket.receive":
return event.get("bytes")
return None
async def receive_json(self) -> Any:
text = await self.receive_text()
return json.loads(text) if text is not None else None
async def send_text(self, text: str) -> None:
await self._send({"type": "websocket.send", "text": text})
async def send_bytes(self, data: bytes) -> None:
await self._send({"type": "websocket.send", "bytes": data})
async def send_json(self, data: Any) -> None:
await self.send_text(json.dumps(data, default=str))
async def close(self, code: int = 1000) -> None:
await self._send({"type": "websocket.close", "code": code})
async def __aenter__(self) -> "WebSocket":
await self.accept()
return self
async def __aexit__(self, *exc) -> None:
await self.close()
# ── The Framework ─────────────────────────────────────────────────────────────
class Bare:
def __init__(self):
self._http_routes: List[Route] = []
self._ws_routes: List[Route] = []
self._startup_handlers: List[Callable] = []
self._shutdown_handlers: List[Callable] = []
self._middleware: List[Callable] = []
self._built_app: Optional[ASGIApp] = None
self.state: Dict[str, Any] = {}
# ── Route registration ────────────────────────────────────────────────
def route(self, path: str, methods: List[str] = None):
"""Decorator to register an HTTP route handler."""
methods = [m.upper() for m in (methods or ["GET"])]
def decorator(func: Callable) -> Callable:
pattern, param_names = compile_route(path)
for method in methods:
self._http_routes.append(
Route(method, pattern, func, param_names)
)
return func
return decorator
def get(self, path: str):
return self.route(path, ["GET"])
def post(self, path: str):
return self.route(path, ["POST"])
def put(self, path: str):
return self.route(path, ["PUT"])
def patch(self, path: str):
return self.route(path, ["PATCH"])
def delete(self, path: str):
return self.route(path, ["DELETE"])
def websocket(self, path: str):
"""Decorator to register a WebSocket handler."""
def decorator(func: Callable) -> Callable:
pattern, param_names = compile_route(path)
self._ws_routes.append(Route("WS", pattern, func, param_names))
return func
return decorator
# ── Lifespan ──────────────────────────────────────────────────────────
def on_startup(self, func: Callable) -> Callable:
self._startup_handlers.append(func)
return func
def on_shutdown(self, func: Callable) -> Callable:
self._shutdown_handlers.append(func)
return func
# ── Middleware ────────────────────────────────────────────────────────
def add_middleware(self, middleware_class, **kwargs):
self._middleware.append((middleware_class, kwargs))
self._built_app = None # Invalidate cache
# ── ASGI interface ────────────────────────────────────────────────────
async def __call__(self, scope, receive, send):
if self._built_app is None:
self._built_app = self._build_app()
await self._built_app(scope, receive, send)
def _build_app(self) -> ASGIApp:
app = self._handle
for mw_class, kwargs in reversed(self._middleware):
app = mw_class(app, **kwargs)
return app
async def _handle(self, scope, receive, send):
scope["app"] = self
if scope["type"] == "lifespan":
await self._handle_lifespan(receive, send)
elif scope["type"] == "http":
await self._handle_http(scope, receive, send)
elif scope["type"] == "websocket":
await self._handle_websocket(scope, receive, send)
async def _handle_lifespan(self, receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
try:
for handler in self._startup_handlers:
await handler() if asyncio.iscoroutinefunction(handler) else handler()
await send({"type": "lifespan.startup.complete"})
except Exception as e:
await send({"type": "lifespan.startup.failed", "message": str(e)})
return
elif event["type"] == "lifespan.shutdown":
for handler in self._shutdown_handlers:
try:
await handler() if asyncio.iscoroutinefunction(handler) else handler()
except Exception:
pass
await send({"type": "lifespan.shutdown.complete"})
return
async def _handle_http(self, scope, receive, send):
method = scope["method"]
path = scope["path"]
matched = []
for route in self._http_routes:
m = route.pattern.match(path)
if m:
matched.append((route, m))
if not matched:
await JSONResponse({"error": "not found"}, 404).send(send)
return
for route, m in matched:
if route.method == method:
scope["path_params"] = m.groupdict()
request = Request(scope, receive)
try:
response = await route.handler(request)
if response is None:
response = Response()
if not isinstance(response, Response):
response = JSONResponse(response)
await response.send(send)
except Exception as e:
await JSONResponse({"error": str(e)}, 500).send(send)
return
allowed = sorted({r.method for r, _ in matched})
await JSONResponse(
{"error": "method not allowed", "allowed": allowed},
405,
headers={"allow": ", ".join(allowed)},
).send(send)
async def _handle_websocket(self, scope, receive, send):
path = scope["path"]
for route in self._ws_routes:
m = route.pattern.match(path)
if m:
scope["path_params"] = m.groupdict()
ws = WebSocket(scope, receive, send)
try:
await route.handler(ws)
except Exception:
await send({"type": "websocket.close", "code": 1011})
return
# No matching WebSocket route — reject
event = await receive()
if event["type"] == "websocket.connect":
await send({"type": "websocket.close", "code": 4004})
def run(self, host: str = "127.0.0.1", port: int = 8000, **kwargs):
"""Convenience method to run with uvicorn."""
import uvicorn
uvicorn.run(self, host=host, port=port, **kwargs)
Using the Framework
# example_app.py
from bare import Bare, JSONResponse, HTMLResponse, WebSocket
app = Bare()
# In-memory store
tasks = {}
@app.on_startup
async def startup():
print("App started. Ready to serve.")
# Connect to database, load config, etc.
@app.on_shutdown
async def shutdown():
print("App stopping. Cleaning up.")
@app.get("/")
async def index(request):
return HTMLResponse("<h1>Bare Framework</h1><p>It's just callables.</p>")
@app.get("/tasks")
async def list_tasks(request):
done = request.query("done")
result = list(tasks.values())
if done == "true":
result = [t for t in result if t["done"]]
elif done == "false":
result = [t for t in result if not t["done"]]
return JSONResponse(result)
@app.post("/tasks")
async def create_task(request):
data = await request.json()
if "title" not in data:
return JSONResponse({"error": "title is required"}, 400)
task = {
"id": str(__import__("uuid").uuid4()),
"title": data["title"],
"done": False,
}
tasks[task["id"]] = task
return JSONResponse(task, 201)
@app.get("/tasks/{task_id}")
async def get_task(request):
task_id = request.path_params["task_id"]
task = tasks.get(task_id)
if not task:
return JSONResponse({"error": "not found"}, 404)
return JSONResponse(task)
@app.patch("/tasks/{task_id}")
async def update_task(request):
task_id = request.path_params["task_id"]
task = tasks.get(task_id)
if not task:
return JSONResponse({"error": "not found"}, 404)
data = await request.json()
if "done" in data:
task["done"] = bool(data["done"])
if "title" in data:
task["title"] = str(data["title"])
return JSONResponse(task)
@app.delete("/tasks/{task_id}")
async def delete_task(request):
task_id = request.path_params["task_id"]
if task_id not in tasks:
return JSONResponse({"error": "not found"}, 404)
return JSONResponse(tasks.pop(task_id))
@app.websocket("/ws")
async def chat(ws: WebSocket):
async with ws: # accept on enter, close on exit
await ws.send_text("Welcome! Type messages to see them echoed.")
while True:
message = await ws.receive_text()
if message is None: # disconnect
break
await ws.send_json({
"type": "echo",
"original": message,
"upper": message.upper(),
})
if __name__ == "__main__":
app.run(reload=True)
python example_app.py
# Test it
curl http://localhost:8000/
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Built with bare"}'
Adding Middleware to Our Framework
import time
import sys
class LoggingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
started = time.monotonic()
status_code = [None]
async def capturing_send(event):
if event["type"] == "http.response.start":
status_code[0] = event["status"]
await send(event)
await self.app(scope, receive, capturing_send)
elapsed = (time.monotonic() - started) * 1000
print(
f"{scope['method']} {scope['path']} → "
f"{status_code[0]} ({elapsed:.1f}ms)",
file=sys.stderr,
)
app.add_middleware(LoggingMiddleware)
Testing Our Framework
import asyncio
import json
async def test_bare_framework():
from example_app import app
# Build a minimal test harness
async def request(method, path, body=b"", headers=None):
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"method": method,
"path": path,
"query_string": b"",
"headers": headers or (
[(b"content-type", b"application/json")] if body else []
),
"server": ("testserver", 8000),
}
events = [{"type": "http.request", "body": body, "more_body": False}]
idx = [0]
async def receive():
e = events[idx[0]] if idx[0] < len(events) else {"type": "http.disconnect"}
idx[0] += 1
return e
response_events = []
async def send(event):
response_events.append(event)
await app(scope, receive, send)
start = next(e for e in response_events if e["type"] == "http.response.start")
body_data = b"".join(
e.get("body", b"") for e in response_events
if e["type"] == "http.response.body"
)
return start["status"], json.loads(body_data) if body_data else None
# Test cases
status, data = await request("GET", "/tasks")
assert status == 200, f"Expected 200, got {status}"
status, data = await request(
"POST", "/tasks",
body=json.dumps({"title": "Framework test"}).encode()
)
assert status == 201
task_id = data["id"]
status, data = await request("GET", f"/tasks/{task_id}")
assert status == 200
assert data["title"] == "Framework test"
status, data = await request("GET", "/tasks/nonexistent")
assert status == 404
print("Framework tests passed.")
asyncio.run(test_bare_framework())
What We Left Out
bare is small by design. Things a production framework would add:
Static file serving — map a URL prefix to a directory of files. Not hard, but involves MIME type detection and If-Modified-Since handling.
Template rendering — integrate with Jinja2 or similar. The Response class would gain a from_template(name, context) factory.
Form parsing — multipart/form-data for file uploads. The spec is in RFC 7578 and it’s tedious.
Cookie handling — parsing the Cookie header and setting Set-Cookie on responses.
Session middleware — signed cookies or server-side sessions backed by Redis.
Error pages — catch exceptions in handlers and return friendly 500 pages rather than bare JSON.
OpenAPI generation — FastAPI’s big feature: inspect route handlers’ type annotations and build an OpenAPI schema automatically.
Each of these is a well-understood problem with known solutions. The framework doesn’t do anything you couldn’t do yourself — it just packages the common solutions conveniently.
The Point
You’ve now built a framework. It’s small, but it’s real — the tasks app runs on it, the WebSocket handler works, middleware composes correctly. You understand every line of it because you wrote every line of it.
When you use FastAPI or Starlette next week, you’ll recognize the patterns: the route decorators are building a route table. The Request object is wrapping the scope. The Response is sending events to send. The lifespan context manager is handling startup/shutdown events.
The framework isn’t doing anything mysterious. It’s doing exactly what you’d do if you wrote it yourself — which you just did.
You Know Too Much Now (What to Do With It)
You started this book believing — or at least accepting on faith — that web frameworks were doing something fundamentally complex. Something that required expertise and abstraction to approach safely. Something you didn’t need to understand to use.
That belief, it turns out, was based on a lie of omission. Not a malicious one. Frameworks genuinely are complex in the engineering sense — they handle thousands of edge cases, they’re battle-tested against adversarial input, they’ve accumulated years of hard-won knowledge about what goes wrong in production. None of that complexity is wasted.
But the interface they expose to the world is just a callable. A function. The whole thing.
What You’ve Built
Over the course of this book, you built:
- A raw HTTP parser that turns TCP bytes into structured request data
- A WSGI server that accepts connections, builds
environ, and calls applications - WSGI middleware for logging, authentication, CORS, timing, and request IDs
- A URL router with path parameter extraction using compiled regular expressions
- Request and Response classes that wrap the WSGI interface with a clean API
- An ASGI server using
asyncio.start_serverwith proper event-based flow - ASGI middleware with request/response interception and lifespan support
- A WebSocket chat server using the full ASGI WebSocket protocol
- A test harness for both WSGI and ASGI applications
- A complete mini-framework with routing, lifespan, WebSockets, and middleware
Each of these was built from first principles, consulting the spec rather than copying from an existing library. None of them are production-ready in the sense that Gunicorn or Uvicorn is production-ready — that would take more than a book — but all of them are correct, which is what matters for understanding.
What This Changes
The practical impact of understanding your tools at this level is subtle but real.
Debugging gets easier. When Django throws an error in WSGIHandler.__call__, you know what that is. When Uvicorn logs a warning about a malformed request, you understand the parsing step it’s complaining about. When FastAPI returns a 422 on a request you think is valid, you can trace it through the parameter extraction code you now understand.
Performance becomes legible. WSGI workers handle one request at a time per thread — you know why, because you built a synchronous server. ASGI handles many requests per event loop iteration — you know why, because you built an async server. When someone says “add more Gunicorn workers,” you know what that means at the socket level.
Middleware composition is obvious. You’ll never again be confused about the order your middleware runs in, because you’ve implemented build_middleware_stack yourself and seen how reversed(middleware) produces the right wrapping order.
Testing is straightforward. Your app is a callable. Call it with a fake environ or scope. Inspect the result. This is all your test client is doing, and now you know it.
Choosing between WSGI and ASGI is rational. You know what WSGI can’t do (hold connections open, do async I/O efficiently) and why ASGI exists to address those limitations. The choice isn’t “what’s the new thing” — it’s a decision based on your actual requirements.
What You Should Still Use Frameworks For
Knowing how something works doesn’t mean you should build it yourself for every project. The wheel is understood; you still don’t build a new one for every car.
Use Django when you want the batteries: ORM, admin, migrations, auth. It’s a well-engineered solution to a set of common problems, and “well-engineered” includes fifteen years of security patches.
Use FastAPI when you want async performance, type-annotated APIs, and automatic OpenAPI docs with minimal boilerplate. The type coercion and documentation generation are genuinely valuable and tedious to implement correctly.
Use Flask when you want something small and explicit, where you add only what you need.
Use the mini-framework from the last chapter when you want something you understand completely and can modify freely — for personal projects, for microservices with unusual requirements, for fun.
The right answer depends on context. Now you have enough context to make the decision rationally.
What the Spec Documents Are For
You now have a reason to read them:
- PEP 3333 (WSGI): python.org/dev/peps/pep-3333 — the canonical reference for everything
environ,start_response, and the response iterable contract - ASGI Spec: asgi.readthedocs.io — scope types, event names, and the lifespan protocol
- RFC 9110 (HTTP Semantics): the definitive reference for HTTP methods, status codes, and header semantics
- RFC 6455 (WebSocket): the WebSocket protocol spec, including the handshake, frame format, and close codes
These aren’t bedtime reading. They’re reference documents. Now that you have a mental model of what they’re specifying, they become useful rather than impenetrable.
The Deeper Point
There’s a broader principle at work here beyond Python web development.
Most of the complexity in software is accidental — it comes from accumulated decisions, backward compatibility constraints, and the need to handle edge cases that most applications never encounter. The essential complexity (the hard part that can’t be simplified away) is usually much smaller.
The essential complexity of a web server is: read bytes, parse HTTP, call a callable, write bytes. Everything else is handling the cases where that simple description fails.
When you encounter a system that seems impossibly complex — a message broker, a container runtime, a database engine — the same approach applies: find the interface, build something that implements it, and watch the complexity become accidental rather than essential.
This book was about Python web protocols. The method is general.
A Note on bare
The mini-framework we built in the last chapter is a teaching tool, not a production framework. If you found the exercise genuinely useful and want to continue building, consider:
- Adding proper error handling with custom exception classes
- Implementing dependency injection (FastAPI’s killer feature is more tractable than it looks)
- Adding background task scheduling
- Implementing WebSocket rooms properly with asyncio queues
- Writing comprehensive tests for the framework itself
Or, more likely, go back to FastAPI or Starlette with a much better understanding of what they’re doing and why.
Thank You
This book exists because Georgiy Treyvus asked the right question at the right time. Building software is easier when someone asks “but what’s actually happening?”
Go build something. You know too much to be mystified by it now.
Back to Bare Metal: WSGI & ASGI for Python Developers Published by CloudStreet — github.com/cloudstreet-dev CC0 1.0 Universal — no rights reserved