Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

The Lie You’ve Been Living

With special thanks to Georgiy Treyvus, whose idea made this book happen.


You’ve been writing Python web applications for — let’s say — a while. You know how to define routes. You know how to write views. You know that request.method gives you "GET" or "POST", and you know that returning a Response object makes things appear in someone’s browser.

What you may not know is what any of that actually is.

Here is the thing that your framework would prefer you not examine too closely: it’s a function. The entire web application you’ve been building — the routing, the middleware, the template rendering, the session handling, the authentication system, the REST API — all of it ultimately compiles down to a Python callable that takes some arguments and returns something.

That’s it. That’s the whole trick.

Let’s Prove It Right Now

Here is a complete, functional web application that will run in production:

def application(environ, start_response):
    status = "200 OK"
    headers = [("Content-Type", "text/plain")]
    start_response(status, headers)
    return [b"Hello, world"]

Save this as app.py. Install gunicorn (pip install gunicorn). Run:

gunicorn app:application

You now have a production web server serving HTTP requests. No framework. No dependencies beyond gunicorn. No magic.

If you point your browser at http://localhost:8000, you’ll see “Hello, world”.

That function — application — is a WSGI application. Everything Django and Flask have ever done starts from exactly this interface.

The Moment of Recognition

Now look at Django. From django/core/handlers/wsgi.py:

class WSGIHandler(base.BaseHandler):
    request_class = WSGIRequest

    def __call__(self, environ, start_response):
        set_script_prefix(get_script_name(environ))
        signals.request_started.send(sender=self.__class__, environ=environ)
        request = self.request_class(environ)
        response = self.get_response(request)
        # ... headers, status ...
        start_response(status, response_headers)
        # ...
        return response

It’s __call__. Django’s entire web framework is a class with a __call__ method that takes environ and start_response. It is, by definition, a callable that implements the WSGI interface.

Every piece of Django — the ORM, the admin, the URL dispatcher, the template engine — exists to produce that callable. The callable is the product.

Flask? Same thing:

class Flask(App):
    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

FastAPI runs on top of Starlette, which is ASGI (we’ll get to that). But strip it down and you find the same idea: a callable with a defined interface.

Why This Matters

If you understand that your framework is a callable with a specific signature, several things become clear:

Testing becomes obvious. Your app is a function. Call it with the right arguments and inspect the result. No magic test client needed — though those are useful too.

Middleware makes sense. Middleware is a callable that takes a callable and returns a callable. It’s function composition. It’s wrappers. Once you see this, the middleware stack is just a chain of decorators with extra steps.

The framework is not special. It’s solving real problems — routing, request parsing, response serialization — but it’s doing so with the same Python you write every day. There’s no privileged access, no hidden C extensions doing the real work (well, sometimes there are C extensions, but not for routing). It’s just code.

Debugging gets easier. When something goes wrong at the framework level, you now have a mental model of where to look. The request came in. It hit the WSGI callable. Something happened between environ and start_response. You can trace it.

The Interfaces, Briefly

There are two specs we care about in this book.

WSGI (Web Server Gateway Interface, PEP 3333) is the synchronous interface. It’s been around since 2003. Every line of Python web code written before async became mainstream runs on top of it. The entire spec is essentially:

application(environ, start_response) -> iterable of bytes

ASGI (Asynchronous Server Gateway Interface) is the async successor. It was designed to handle things WSGI can’t — WebSockets, long-polling, HTTP/2 push — by making the entire interface async. The spec is:

application(scope, receive, send) -> None  # but async

Both specs define a contract between a web server (Gunicorn, Uvicorn, Hypercorn) and a web application (your code, or Django, or FastAPI). The server handles the TCP connection, parses the HTTP request, and calls your callable. Your callable decides what to return. The server sends it back.

The framework just makes it easier to write that callable. That’s the whole job.

What This Book Will Do

We’re going to start at the bottom.

In Part I, we’ll look at what HTTP actually is (text over a socket), what the frameworks are doing, and why understanding this matters for your day-to-day work.

In Part II, we’ll implement WSGI from first principles: a server, middleware, routing, and request/response abstractions — all from scratch.

In Part III, we’ll do the same for ASGI: the async model, WebSockets, lifespan events, and building an async server.

In Part IV, we’ll look at patterns — testing, middleware composition, and building a small framework — to solidify everything.

By the end, you’ll be able to read the Gunicorn source code and understand what it’s doing. You’ll know what Uvicorn’s main() actually does. You’ll be able to debug framework-level issues because you’ll have written the framework-level code yourself.

More importantly, you’ll look at your next Django application and see it for what it is: a callable. A very sophisticated, well-tested, production-hardened callable — but a callable nonetheless.

The magic was just Python with good variable names.

Let’s start with the protocol.

HTTP Is Just Text

Before we talk about WSGI or ASGI, we need to talk about what they’re abstracting over. And what they’re abstracting over is HTTP. And HTTP is just text.

This is not a simplification. Open a TCP connection to port 80 of any web server in the world, type the right bytes, and you’ll get an HTTP response. You don’t need a library. You need a socket and the knowledge of what to type.

Let’s actually do it.

Talking to a Web Server with a Raw Socket

import socket

def raw_http_request(host: str, path: str = "/") -> str:
    """Make an HTTP/1.1 GET request using nothing but a socket."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, 80))

    # This is a complete, valid HTTP/1.1 request.
    request = (
        f"GET {path} HTTP/1.1\r\n"
        f"Host: {host}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    )

    sock.sendall(request.encode("utf-8"))

    # Read the response in chunks
    response = b""
    while chunk := sock.recv(4096):
        response += chunk

    sock.close()
    return response.decode("utf-8", errors="replace")


if __name__ == "__main__":
    response = raw_http_request("example.com")
    print(response[:500])  # Just the beginning

Run this and you’ll see something like:

HTTP/1.1 200 OK
Content-Encoding: gzip
Accept-Ranges: bytes
Age: 123456
Cache-Control: max-age=604800
Content-Type: text/html; charset=UTF-8
Date: Thu, 01 Jan 2026 00:00:00 GMT
...

<!doctype html>
<html>
...

That’s it. That’s HTTP. A text request, a text response. The format is specified in RFC 9110 (and historically RFC 2616, RFC 7230, etc.) but the format itself is not complicated.

The Structure of an HTTP Request

An HTTP request has this shape:

METHOD /path HTTP/version\r\n
Header-Name: header-value\r\n
Another-Header: another-value\r\n
\r\n
[optional body]

A minimal GET request:

GET /index.html HTTP/1.1\r\n
Host: example.com\r\n
\r\n

A POST request with a body:

POST /api/users HTTP/1.1\r\n
Host: api.example.com\r\n
Content-Type: application/json\r\n
Content-Length: 27\r\n
\r\n
{"name": "Alice", "age": 30}

Three parts: the request line, the headers, and the body. Each header is on its own line. Headers are separated from the body by a blank line (\r\n\r\n). The \r\n is a carriage return followed by a newline — HTTP requires both, not just \n.

The Structure of an HTTP Response

An HTTP response:

HTTP/version STATUS_CODE Reason Phrase\r\n
Header-Name: header-value\r\n
Another-Header: another-value\r\n
\r\n
[body]

A minimal response:

HTTP/1.1 200 OK\r\n
Content-Type: text/plain\r\n
Content-Length: 13\r\n
\r\n
Hello, world!

The status line, headers, blank line, body. Same structure, mirrored.

Parsing HTTP Requests

Let’s write a basic HTTP request parser. Not to use in production — for understanding what Gunicorn and Uvicorn do on every single request before they ever touch your application code.

from dataclasses import dataclass, field
from typing import Dict, Optional


@dataclass
class HTTPRequest:
    method: str
    path: str
    query_string: str
    http_version: str
    headers: Dict[str, str]
    body: bytes

    @property
    def content_type(self) -> Optional[str]:
        return self.headers.get("content-type")

    @property
    def content_length(self) -> int:
        return int(self.headers.get("content-length", 0))


def parse_request(raw: bytes) -> HTTPRequest:
    """
    Parse a raw HTTP request into an HTTPRequest object.
    Handles the header/body split and basic header parsing.
    """
    # Split headers from body at the blank line
    header_section, _, body = raw.partition(b"\r\n\r\n")

    # Split header section into individual lines
    lines = header_section.decode("utf-8", errors="replace").split("\r\n")

    # First line is the request line
    request_line = lines[0]
    method, raw_path, http_version = request_line.split(" ", 2)

    # Split path from query string
    if "?" in raw_path:
        path, query_string = raw_path.split("?", 1)
    else:
        path, query_string = raw_path, ""

    # Parse headers (everything after the request line)
    headers = {}
    for line in lines[1:]:
        if ": " in line:
            name, _, value = line.partition(": ")
            headers[name.lower()] = value

    return HTTPRequest(
        method=method,
        path=path,
        query_string=query_string,
        http_version=http_version,
        headers=headers,
        body=body,
    )


# Test it
raw_request = (
    b"POST /api/users?active=true HTTP/1.1\r\n"
    b"Host: localhost\r\n"
    b"Content-Type: application/json\r\n"
    b"Content-Length: 27\r\n"
    b"\r\n"
    b'{"name": "Alice", "age": 30}'
)

req = parse_request(raw_request)
print(f"Method: {req.method}")
print(f"Path: {req.path}")
print(f"Query: {req.query_string}")
print(f"Content-Type: {req.content_type}")
print(f"Body: {req.body}")

Output:

Method: POST
Path: /api/users
Query: active=true
Content-Type: application/json
Body: b'{"name": "Alice", "age": 30}'

This is, roughly, what every web server does before handing control to your application. Gunicorn’s HTTP parser is more robust (it handles edge cases, malformed requests, chunked transfer encoding, etc.), but conceptually it’s doing exactly this.

Building an HTTP Response

The other direction: given what you want to send back, construct valid HTTP bytes.

def build_response(
    status_code: int,
    reason: str,
    headers: Dict[str, str],
    body: bytes,
) -> bytes:
    """Build a raw HTTP/1.1 response."""
    status_line = f"HTTP/1.1 {status_code} {reason}\r\n"

    # Always include Content-Length
    headers["Content-Length"] = str(len(body))

    header_lines = "".join(
        f"{name}: {value}\r\n"
        for name, value in headers.items()
    )

    return (
        status_line.encode("utf-8")
        + header_lines.encode("utf-8")
        + b"\r\n"
        + body
    )


response = build_response(
    200,
    "OK",
    {"Content-Type": "text/plain"},
    b"Hello, world!",
)
print(response.decode("utf-8"))

Output:

HTTP/1.1 200 OK
Content-Type: text/plain
Content-Length: 13

Hello, world!

What WSGI Does With All This

Now think about this: when a request comes in to a Gunicorn worker, the worker:

  1. Reads bytes from the socket
  2. Parses them into method, path, headers, body (as above)
  3. Packs all of that into a dictionary called environ
  4. Calls your WSGI application with environ and start_response
  5. Takes whatever your application returns and writes it back to the socket as HTTP response bytes

The environ dictionary is just a structured version of the parsed HTTP request. REQUEST_METHOD is the method. PATH_INFO is the path. HTTP_CONTENT_TYPE is the Content-Type header. wsgi.input is a file-like object wrapping the body bytes.

When you call start_response("200 OK", [("Content-Type", "text/plain")]) in your WSGI app, you’re providing the status line and headers that the server will write back. When you return [b"Hello, world!"], you’re providing the response body.

The server just… sends it.

[raw bytes in] -> [parse] -> [your callable] -> [serialize] -> [raw bytes out]

That’s the entire pipeline. WSGI is just the contract for the middle part.

Keepalive, Chunked Encoding, and Things We’re Ignoring

Real HTTP has some complexity we’ve glossed over:

Connection: keep-alive — HTTP/1.1 defaults to keeping the connection open for multiple requests. The server needs to know when one request ends and the next begins, which it does via Content-Length or chunked transfer encoding.

Chunked transfer encoding — instead of specifying Content-Length upfront, you can stream the response in chunks, each prefixed with its size in hex. This is how streaming responses work.

HTTP/2 — multiplexed streams over a single connection, binary framing, header compression. Same semantics, very different wire format.

TLS — everything above happens over an encrypted connection. Same protocol, but the bytes going over the wire are ciphertext.

WSGI abstracts all of this. You don’t handle keep-alive or chunked encoding directly. The server does. You write your callable; the server handles the transport.

ASGI handles more of these edge cases natively — particularly streaming — which is part of why it exists. We’ll get there.

The Thing to Hold Onto

HTTP is a text protocol. Requests are lines of text: a request line, headers, body. Responses are lines of text: a status line, headers, body. The blank line between headers and body is significant. The \r\n line endings are required.

Everything your framework does is ultimately:

  1. Parse the incoming text into a convenient Python object
  2. Call your handler function
  3. Serialize the result back into text

That’s the whole transaction. Hold that mental model as we build the WSGI layer on top of it.

What Django and FastAPI Are Actually Doing

We’ve established that HTTP is text and that WSGI is a callable interface. Now let’s look at what Django and FastAPI actually do with that interface — because once you see it, the framework becomes a much less mysterious box.

We’ll trace a request through each one, following the actual code path (simplified to keep it readable). The goal is not to understand every detail of Django’s internals — the Django team has written excellent documentation for that. The goal is to see the skeleton: the WSGI entry point, the routing, and the response serialization.

Django’s Request Path

A Django project has a WSGI entrypoint file, generated by startproject:

# myproject/wsgi.py
import os
from django.core.wsgi import get_wsgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = get_wsgi_application()

application here is what Gunicorn will call. Let’s follow get_wsgi_application():

# django/core/wsgi.py
def get_wsgi_application():
    django.setup()
    return WSGIHandler()

It runs Django setup (loads settings, connects signals, initializes apps) and returns a WSGIHandler. Let’s look at WSGIHandler:

# django/core/handlers/wsgi.py (simplified)
class WSGIHandler(base.BaseHandler):
    request_class = WSGIRequest

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.load_middleware()  # Build the middleware stack

    def __call__(self, environ, start_response):
        # Convert environ to a Django request object
        request = self.request_class(environ)

        # Run the full middleware/view pipeline
        response = self.get_response(request)

        # Django response -> HTTP status string and headers list
        status = '%d %s' % (response.status_code, response.reason_phrase)
        response_headers = list(response.items())
        for c in response.cookies.values():
            response_headers.append(('Set-Cookie', c.output(header='')))

        # Tell the WSGI server what status and headers to use
        start_response(status, response_headers)

        # Return the response body as an iterable
        if request.method == 'HEAD':
            return [b'']
        return response

The __call__ method is the WSGI application. It takes environ and start_response, does Django things, and returns a response iterable.

The Middleware Stack

self.load_middleware() builds a chain of callables. If your MIDDLEWARE setting looks like:

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'myapp.middleware.CustomMiddleware',
]

Then load_middleware() constructs something conceptually like:

def _get_response_none(request):
    # The actual view dispatcher
    return view_function(request)

handler = _get_response_none
for middleware_path in reversed(MIDDLEWARE):
    middleware_class = import_string(middleware_path)
    handler = middleware_class(handler)

self._middleware_chain = handler

Each middleware wraps the next one. When you call self.get_response(request), you’re calling self._middleware_chain(request), which unwinds through each middleware layer until it hits the view. This is exactly the turtles-all-the-way-down middleware pattern we’ll implement ourselves in the WSGI section.

The URL Dispatcher

Inside get_response, Django eventually calls:

# django/core/handlers/base.py (simplified)
def _get_response(self, request):
    callback, callback_args, callback_kwargs = self.resolve_request(request)
    response = callback(request, *callback_args, **callback_kwargs)
    return response

resolve_request does URL routing: it takes request.path_info and walks through the urlpatterns list, matching regex patterns or path converters until it finds a match. The match returns the view function and any captured URL parameters.

That view function is what you write. Django calls it. You return an HttpResponse. Django serializes it. Done.

FastAPI’s Request Path

FastAPI is an ASGI framework (we’ll cover ASGI in Part III), but the same “it’s just a callable” principle applies.

from fastapi import FastAPI

app = FastAPI()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    return {"user_id": user_id}

app here is a FastAPI instance. FastAPI inherits from Starlette, which implements the ASGI interface. When Uvicorn calls your application:

# uvicorn calls this
await app(scope, receive, send)

Starlette’s __call__ (simplified):

class Starlette:
    async def __call__(self, scope, receive, send):
        scope["app"] = self
        if self.middleware_stack is None:
            self.middleware_stack = self.build_middleware_stack()
        await self.middleware_stack(scope, receive, send)

Same pattern: a middleware stack, built once, called on every request.

At the bottom of the stack is the router. FastAPI’s router matches the path and HTTP method against registered routes, extracts path parameters, and calls your endpoint function.

The Clever Part: Dependency Injection and Type Hints

The thing FastAPI adds is automatic parsing of function parameters using type hints. When you write:

@app.get("/items/{item_id}")
async def read_item(item_id: int, q: Optional[str] = None):
    return {"item_id": item_id, "q": q}

FastAPI uses inspect.signature to introspect the function, reads the type annotations, and automatically:

  • Extracts item_id from the path (because it’s in the {item_id} path template)
  • Extracts q from the query string (because it’s not in the path)
  • Converts item_id to int and validates it
  • Returns a 422 if conversion fails

This is done at startup (when the route is registered) using Pydantic and Python’s inspect module. There’s no magic — it’s reflection and type coercion applied systematically.

# What FastAPI is doing under the hood (very simplified)
import inspect
from typing import get_type_hints

def build_endpoint_handler(func):
    sig = inspect.signature(func)
    hints = get_type_hints(func)

    async def handler(scope, receive, send):
        # Extract path params, query params from scope
        path_params = scope.get("path_params", {})
        query_string = scope.get("query_string", b"").decode()

        # Build kwargs for the function
        kwargs = {}
        for name, param in sig.parameters.items():
            if name in path_params:
                kwargs[name] = hints[name](path_params[name])  # type coercion
            # ... query param extraction, body parsing, etc.

        result = await func(**kwargs)

        # Serialize result to JSON response
        # ...

    return handler

That’s the core of FastAPI’s “magic”. It’s Python’s inspect module and type coercion, applied at startup to build efficient request handlers.

Flask’s Request Path

Flask is simpler than Django but uses the same WSGI interface. The Flask class has a __call__ method:

class Flask:
    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

    def wsgi_app(self, environ, start_response):
        ctx = self.request_context(environ)
        ctx.push()
        try:
            response = self.full_dispatch_request()
            return response(environ, start_response)
        finally:
            ctx.pop()

Flask’s “request context” and “application context” are thread-local (or greenlet-local) storage — that’s how flask.request works without being passed as a parameter. When you access request.method in a Flask view, Flask looks up the current request from a thread-local stack that was pushed when ctx.push() was called.

This is convenient, but it’s not magic. It’s an implicit parameter passing mechanism. WSGI is synchronous and single-threaded-per-request, so thread-local storage works. This is also why Flask’s approach breaks down with async — thread-locals don’t survive across await points, which is one reason Flask’s async support required careful workarounds.

What They All Have in Common

Every Python web framework, at its core, does this:

environ/scope
    │
    ▼
┌─────────────────────────────────────────────────────┐
│ Middleware stack                                     │
│   ├── Security / CORS / compression / auth          │
│   ├── Session management                            │
│   └── (your middleware here)                        │
│         │                                           │
│         ▼                                           │
│ URL Router                                          │
│   └── match path → find handler function           │
│         │                                           │
│         ▼                                           │
│ Handler / View                                      │
│   └── your code runs here                          │
│         │                                           │
│         ▼                                           │
│ Response serialization                              │
│   └── status + headers + body → bytes              │
└─────────────────────────────────────────────────────┘
    │
    ▼
start_response(status, headers) + return [body_bytes]

The framework is providing:

  1. A way to compose middleware (the stack builder)
  2. URL routing (pattern matching on PATH_INFO)
  3. Request parsing (wrapping environ in a convenient object)
  4. Response serialization (turning your return value into WSGI-compatible bytes)

None of these are hard to understand. Some are hard to implement well — Django’s URL dispatcher handles edge cases you’d never think of, and FastAPI’s type coercion is quite sophisticated. But conceptually, they’re all doing the same four things.

Building It Yourself

The rest of Part II is devoted to building each of these pieces from scratch. By the time we’re done, you’ll have:

  • A working WSGI server
  • A middleware stack
  • A URL router
  • Request and Response classes

None of it will be production-ready in the sense that Django is production-ready. But all of it will be correct, and building it will give you a ground-level understanding that reading the Django source code alone doesn’t provide.

The question isn’t “how does Django do routing?” The question is “what problem does routing solve, and what’s the simplest possible correct implementation?” Once you’ve answered the second question yourself, the first becomes easy to read.

Let’s start with the spec.

The WSGI Spec (It Fits on a Napkin)

PEP 3333 is the WSGI specification. It is 2,500 words long. For context, this chapter is longer. The actual interface it defines is expressible in fewer than ten lines of Python. This is either a sign of elegant design or a sign that we’ve been dramatically over-complicating web development for twenty years. Possibly both.

Let’s read the spec together — not the full PEP, but the essential contract it defines.

The Interface in Full

The complete WSGI interface, distilled:

def application(environ: dict, start_response: callable) -> Iterable[bytes]:
    """
    A WSGI application is any callable that:
    1. Accepts environ (dict) and start_response (callable)
    2. Calls start_response(status, response_headers) exactly once before returning
    3. Returns an iterable of byte strings (the response body)
    """
    status = "200 OK"
    response_headers = [("Content-Type", "text/plain; charset=utf-8")]
    start_response(status, response_headers)
    return [b"Hello, world!\n"]

That’s it. That’s WSGI. Everything else is detail.

The environ Dictionary

environ is a Python dictionary containing CGI-style environment variables plus some WSGI-specific additions. When Gunicorn receives an HTTP request, it parses it and packs the results into this dict.

Here are the keys your application will actually use:

# Request method
environ['REQUEST_METHOD']    # "GET", "POST", "PUT", "DELETE", etc.

# URL components
environ['PATH_INFO']         # "/users/42" — the URL path
environ['QUERY_STRING']      # "active=true&page=2" — without the "?"
environ['SERVER_NAME']       # "example.com"
environ['SERVER_PORT']       # "80" (note: string, not int)

# HTTP headers (prefixed with HTTP_, hyphens become underscores, uppercased)
environ['HTTP_HOST']                # "example.com"
environ['HTTP_ACCEPT']              # "text/html,application/xhtml+xml,..."
environ['HTTP_AUTHORIZATION']       # "Bearer abc123"
environ['HTTP_CONTENT_TYPE']        # Note: also available as CONTENT_TYPE (no HTTP_ prefix)
environ['CONTENT_TYPE']             # "application/json"
environ['CONTENT_LENGTH']           # "42" (string, or empty string if unknown)

# Request body
environ['wsgi.input']        # file-like object, read() to get the body bytes

# WSGI metadata
environ['wsgi.version']      # (1, 0)
environ['wsgi.url_scheme']   # "http" or "https"
environ['wsgi.multithread']  # True if server may run multiple threads
environ['wsgi.multiprocess'] # True if server may fork multiple processes
environ['wsgi.run_once']     # True if application will only be invoked once
environ['wsgi.errors']       # file-like object for error output (stderr)

Let’s write a small app that dumps the environ so you can see it for yourself:

import json


def environ_dumper(environ, start_response):
    """Dump the environ dict as JSON for debugging."""
    # Some values aren't JSON-serializable; convert them
    safe_environ = {}
    for key, value in sorted(environ.items()):
        if isinstance(value, (str, int, float, bool, type(None))):
            safe_environ[key] = value
        else:
            safe_environ[key] = f"<{type(value).__name__}>"

    body = json.dumps(safe_environ, indent=2).encode("utf-8")

    start_response("200 OK", [
        ("Content-Type", "application/json"),
        ("Content-Length", str(len(body))),
    ])
    return [body]


if __name__ == "__main__":
    from wsgiref.simple_server import make_server
    server = make_server("127.0.0.1", 8000, environ_dumper)
    print("Serving on http://127.0.0.1:8000")
    server.serve_forever()

Run this, hit http://127.0.0.1:8000/some/path?foo=bar in your browser, and you’ll see everything Gunicorn (or wsgiref) passes to your application. It demystifies a lot.

The start_response Callable

start_response is a callable provided by the server. Your application calls it to set the response status and headers. Its signature:

def start_response(
    status: str,              # e.g. "200 OK", "404 Not Found"
    response_headers: list,   # list of (name, value) tuples
    exc_info=None,            # for error handling, discussed below
) -> write_callable:          # legacy write callable, don't use this

The status string must be a valid HTTP status: three digits, a space, and a reason phrase. The reason phrase can be anything — the spec doesn’t require it to be the canonical one — but convention is to use the standard phrases.

The response_headers is a list of (name, value) tuples. Names are case-insensitive in HTTP; convention is Title-Case. Values must be strings.

# Valid calls to start_response
start_response("200 OK", [
    ("Content-Type", "text/html; charset=utf-8"),
    ("X-Custom-Header", "my-value"),
])

start_response("404 Not Found", [
    ("Content-Type", "text/plain"),
])

start_response("302 Found", [
    ("Location", "https://example.com/new-url"),
    ("Content-Type", "text/plain"),
])

The write callable that start_response returns is a legacy escape hatch for applications that need to write response data before returning from the callable. Don’t use it in new code. The spec includes it for backward compatibility with pre-WSGI CGI-style code.

The Return Value

Your application must return an iterable of byte strings. Each item in the iterable is a chunk of the response body. The server will concatenate and send them.

# All of these are valid return values:

return [b"Hello, world!"]                         # Single chunk
return [b"Hello, ", b"world!"]                    # Multiple chunks
return iter([b"chunk 1", b"chunk 2"])             # Iterator
return (b"x" for x in range(3))                  # Generator

# For streaming responses, a generator is useful:
def streaming_app(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    def generate():
        for i in range(100):
            yield f"Line {i}\n".encode("utf-8")
    return generate()

One important constraint: you must call start_response before (or while) the server is consuming your return iterable. In practice, call it before you return. The server will call next() on your iterable to get chunks, and by that point it needs to know the status and headers.

The close() Method

If your return iterable has a close() method, the server will call it when it’s done — even if an exception occurred during iteration. This is how you ensure cleanup (open file handles, database connections, etc.) happens even when the response is only partially sent.

class FileResponse:
    def __init__(self, filepath):
        self.f = open(filepath, "rb")

    def __iter__(self):
        while chunk := self.f.read(8192):
            yield chunk

    def close(self):
        self.f.close()  # Server will call this


def file_serving_app(environ, start_response):
    path = environ['PATH_INFO'].lstrip('/')
    response = FileResponse(path)
    start_response("200 OK", [("Content-Type", "application/octet-stream")])
    return response

Error Handling with exc_info

If an error occurs after start_response has been called (and headers may have been sent), you can call start_response again with exc_info set. This is how middleware propagates exceptions:

import sys

def application(environ, start_response):
    try:
        # ... do work ...
        start_response("200 OK", [("Content-Type", "text/plain")])
        return [b"OK"]
    except Exception:
        start_response("500 Internal Server Error",
                       [("Content-Type", "text/plain")],
                       sys.exc_info())  # Pass exception info
        return [b"Internal Server Error"]

If headers haven’t been sent yet, the server will use the new status/headers. If headers have already been sent (which can happen with streaming responses), the server will re-raise the exception — there’s nothing else it can do at that point.

What the Server Side Looks Like

To fully understand the contract, it helps to see what the server-side caller looks like. Here’s a minimal version:

def call_wsgi_app(app, environ):
    """
    Call a WSGI app and collect the response.
    Returns (status, headers, body_bytes).
    """
    response_started = []

    def start_response(status, headers, exc_info=None):
        if exc_info:
            try:
                if response_started:
                    raise exc_info[1].with_traceback(exc_info[2])
            finally:
                exc_info = None
        response_started.append((status, headers))

    result = app(environ, start_response)

    try:
        body = b"".join(result)
    finally:
        if hasattr(result, "close"):
            result.close()

    status, headers = response_started[0]
    return status, headers, body

This is what Gunicorn’s worker essentially does — before sending it back over the socket as HTTP bytes. Note how start_response just stores the status and headers; the actual sending happens after app() returns.

The Complete Spec, Annotated

Here’s the one-page summary of everything WSGI requires:

APPLICATION:
- Must be callable
- Takes two arguments: environ (dict), start_response (callable)
- Must call start_response(status, headers) exactly once
  (or on error, may call it again with exc_info)
- Must return an iterable of byte strings
- The iterable may have a close() method; if so, server must call it

ENVIRON:
- Must contain CGI/1.1 variables (REQUEST_METHOD, PATH_INFO, etc.)
- Must contain wsgi.input (readable file-like object for body)
- Must contain wsgi.errors (writable file-like for errors)
- Must contain wsgi.version (1, 0)
- Must contain wsgi.url_scheme ("http" or "https")
- Must contain wsgi.multithread, wsgi.multiprocess, wsgi.run_once (bools)
- HTTP headers: prefixed with HTTP_, hyphens→underscores, uppercased
  Exception: Content-Type and Content-Length have no HTTP_ prefix

START_RESPONSE:
- Takes status (str), response_headers (list of 2-tuples), exc_info (optional)
- Status format: "NNN Reason Phrase"
- Headers: list of (name, value) tuples, strings only
- Returns a write() callable (legacy; don't use)
- May be called again only if exc_info is provided

SERVER:
- Must call app(environ, start_response) to get response
- Must send status and headers before body
- Must call result.close() if it exists, even on error
- Must handle chunked responses (iterate over return value)

That’s the contract. Two functions talking to each other with a well-defined interface. The server provides environ and start_response; your app provides the response.

In the next chapter, we’ll write a real WSGI application — no wsgiref, no framework, just the spec and a server call.

Your First WSGI App (No Training Wheels)

Let’s build something real. Not “hello world” (we did that in the introduction) — a genuinely useful WSGI application with multiple routes, request parsing, and proper response handling. All without a framework.

By the end of this chapter you’ll have a working JSON API that you can run with Gunicorn. It won’t be pretty. That’s the point.

The Problem

We’re building a simple in-memory task management API. It will support:

  • GET /tasks — list all tasks
  • POST /tasks — create a task
  • GET /tasks/{id} — get a specific task
  • DELETE /tasks/{id} — delete a task

That’s enough to demonstrate routing, request body parsing, path parameter extraction, and proper HTTP response semantics without drowning in incidental complexity.

Reading the Request Body

The first thing most tutorials skip over: how do you read the request body in WSGI?

def read_body(environ) -> bytes:
    """Read the request body from environ['wsgi.input']."""
    try:
        content_length = int(environ.get('CONTENT_LENGTH', 0) or 0)
    except (ValueError, TypeError):
        content_length = 0

    if content_length > 0:
        return environ['wsgi.input'].read(content_length)
    return b''

Why or 0? Because CONTENT_LENGTH might be an empty string (""), which int() can’t convert. The or 0 handles that case. Why the try/except? Because you can’t fully trust incoming headers.

Why read(content_length) rather than just read()? The spec says wsgi.input may be a socket-backed stream. Calling read() without a limit might block indefinitely waiting for a connection that never closes. Always read exactly Content-Length bytes.

Parsing JSON Bodies

import json
from typing import Any, Optional


def parse_json_body(environ) -> Optional[Any]:
    """Parse a JSON request body, returning None if absent or invalid."""
    content_type = environ.get('CONTENT_TYPE', '')
    if 'application/json' not in content_type:
        return None

    body = read_body(environ)
    if not body:
        return None

    try:
        return json.loads(body)
    except json.JSONDecodeError:
        return None

Building Responses

Rather than calling start_response directly everywhere, let’s build a small helper:

def json_response(start_response, data: Any, status: int = 200) -> list[bytes]:
    """Send a JSON response."""
    STATUS_PHRASES = {
        200: "OK",
        201: "Created",
        400: "Bad Request",
        404: "Not Found",
        405: "Method Not Allowed",
        500: "Internal Server Error",
    }
    body = json.dumps(data, indent=2).encode("utf-8")
    phrase = STATUS_PHRASES.get(status, "Unknown")
    start_response(
        f"{status} {phrase}",
        [
            ("Content-Type", "application/json"),
            ("Content-Length", str(len(body))),
        ]
    )
    return [body]

The Application

Now the actual application. Notice the structure: it’s just a function that dispatches based on method and path.

import json
import re
import uuid
from typing import Any, Optional


# In-memory store
tasks: dict[str, dict] = {}


def read_body(environ) -> bytes:
    try:
        content_length = int(environ.get('CONTENT_LENGTH', 0) or 0)
    except (ValueError, TypeError):
        content_length = 0
    if content_length > 0:
        return environ['wsgi.input'].read(content_length)
    return b''


def parse_json_body(environ) -> Optional[Any]:
    content_type = environ.get('CONTENT_TYPE', '')
    if 'application/json' not in content_type:
        return None
    body = read_body(environ)
    if not body:
        return None
    try:
        return json.loads(body)
    except json.JSONDecodeError:
        return None


STATUS_PHRASES = {
    200: "OK", 201: "Created", 400: "Bad Request",
    404: "Not Found", 405: "Method Not Allowed",
}


def json_response(start_response, data: Any, status: int = 200) -> list[bytes]:
    body = json.dumps(data, indent=2).encode("utf-8")
    phrase = STATUS_PHRASES.get(status, "Unknown")
    start_response(
        f"{status} {phrase}",
        [("Content-Type", "application/json"),
         ("Content-Length", str(len(body)))]
    )
    return [body]


def application(environ, start_response):
    method = environ['REQUEST_METHOD']
    path = environ['PATH_INFO']

    # Route: GET /tasks
    if path == '/tasks' and method == 'GET':
        return json_response(start_response, list(tasks.values()))

    # Route: POST /tasks
    if path == '/tasks' and method == 'POST':
        data = parse_json_body(environ)
        if not data or 'title' not in data:
            return json_response(start_response,
                                 {"error": "title is required"}, 400)
        task = {
            "id": str(uuid.uuid4()),
            "title": data['title'],
            "done": False,
        }
        tasks[task['id']] = task
        return json_response(start_response, task, 201)

    # Route: /tasks/{id}
    match = re.fullmatch(r'/tasks/([^/]+)', path)
    if match:
        task_id = match.group(1)

        if method == 'GET':
            task = tasks.get(task_id)
            if task is None:
                return json_response(start_response,
                                     {"error": "not found"}, 404)
            return json_response(start_response, task)

        if method == 'DELETE':
            if task_id not in tasks:
                return json_response(start_response,
                                     {"error": "not found"}, 404)
            deleted = tasks.pop(task_id)
            return json_response(start_response, deleted)

        return json_response(start_response,
                             {"error": "method not allowed"}, 405)

    # 404 for everything else
    return json_response(start_response, {"error": "not found"}, 404)


if __name__ == '__main__':
    from wsgiref.simple_server import make_server
    print("Serving on http://127.0.0.1:8000")
    with make_server('127.0.0.1', 8000, application) as server:
        server.serve_forever()

Save this as tasks_app.py and run it:

python tasks_app.py

Or with Gunicorn:

pip install gunicorn
gunicorn tasks_app:application

Trying It Out

# Create a task
curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "Learn WSGI"}'
# {"id": "abc-123", "title": "Learn WSGI", "done": false}

# List tasks
curl http://localhost:8000/tasks
# [{"id": "abc-123", "title": "Learn WSGI", "done": false}]

# Get a specific task
curl http://localhost:8000/tasks/abc-123
# {"id": "abc-123", "title": "Learn WSGI", "done": false}

# Delete a task
curl -X DELETE http://localhost:8000/tasks/abc-123
# {"id": "abc-123", "title": "Learn WSGI", "done": false}

# Missing title
curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"description": "oops"}'
# {"error": "title is required"}

# Not found
curl http://localhost:8000/tasks/nonexistent
# {"error": "not found"}

It works. No framework involved. The routing is regex matching on PATH_INFO. The request parsing is reading from wsgi.input. The responses are byte strings with proper headers.

What This Reveals About Frameworks

Look at our application function and notice what’s getting tedious:

  1. Routing: if path == '/tasks' and method == 'GET' — this will get unreadable fast
  2. Response construction: json_response(start_response, data, status) — we’re passing start_response everywhere
  3. Request parsing: parse_json_body(environ) — repeated on every endpoint that accepts a body
  4. Error handling: every route independently returns error responses

A framework solves these problems. Flask gives you @app.route('/tasks', methods=['GET']). Django gives you URL patterns and view functions. FastAPI gives you type annotations and automatic parsing.

But now you know what they’re solving. The @app.route decorator is just adding your function to a routing table and wrapping it so it conforms to the WSGI interface. The request object (flask.request, django.request) is just a wrapper around environ. The response class is a wrapper around start_response and the return value.

It’s convenience all the way down.

The wsgiref Module

Python’s standard library includes wsgiref, a reference implementation of a WSGI server:

from wsgiref.simple_server import make_server
from wsgiref.validate import validator

# Wrap your app with the validator to catch WSGI spec violations
validated_app = validator(application)

with make_server('127.0.0.1', 8000, validated_app) as server:
    server.serve_forever()

wsgiref.validate.validator wraps your application and checks that it correctly implements the WSGI spec — proper return types, calling start_response at the right time, etc. Use it during development; remove it for production.

wsgiref.simple_server is not production-ready (it’s single-threaded and handles one request at a time), but it’s useful for local development when you want zero dependencies.

On State and Concurrency

The tasks dictionary in our app is module-level state. This works fine for a single-process server, but Gunicorn’s default is to use multiple worker processes. Each worker has its own copy of the module, its own tasks dict. Changes in worker 1 are invisible to worker 2.

For production, you’d use a database or shared cache (Redis) instead of in-memory state. This isn’t a WSGI limitation — it’s just how multi-process architectures work. But it’s worth being explicit about: WSGI does not share state between requests. Each call to application() is independent.

This is actually a feature. It makes WSGI applications easy to reason about and easy to scale horizontally. Stateless by default.

Exercises

Before moving on, try these modifications to the app:

  1. Add a PATCH /tasks/{id} endpoint that updates the done field
  2. Add proper Content-Type validation — return 415 if it’s not application/json on POST
  3. Add a request logger: print method, path, and status code for every request
  4. Handle PATCH /tasks/{id} — change the done field

Exercise 3 is a preview of the next chapter. The logging belongs in middleware.

Build a WSGI Server from Scratch

We’ve been running our WSGI applications with wsgiref.simple_server. It works, but it’s a black box. In this chapter, we’ll build our own WSGI server using Python’s socket module — the same primitives that Gunicorn uses, just without fifteen years of production hardening.

By the end of this chapter, you’ll understand exactly what Gunicorn’s workers are doing on every request.

What a WSGI Server Must Do

A WSGI server has one job: accept TCP connections, parse HTTP requests, call WSGI applications, and send HTTP responses. In order:

  1. Bind to a port and listen for connections
  2. Accept a connection
  3. Read bytes from the socket
  4. Parse the bytes into an HTTP request
  5. Build the environ dictionary
  6. Define a start_response callable
  7. Call the WSGI application
  8. Serialize the response (status + headers + body)
  9. Write the response bytes to the socket
  10. Close the connection (or keep it alive for HTTP/1.1)

We’ll implement all of these, handling one request at a time (no threading — that’s a refinement for later).

The HTTP Parser

First, let’s build a proper request parser:

from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
import io


@dataclass
class ParsedRequest:
    method: str
    path: str
    query_string: str
    http_version: str
    headers: Dict[str, str]
    body: bytes


def parse_request(raw: bytes) -> Optional[ParsedRequest]:
    """
    Parse raw HTTP request bytes into a ParsedRequest.
    Returns None if the request is malformed.
    """
    # Split at the blank line separating headers from body
    header_end = raw.find(b"\r\n\r\n")
    if header_end == -1:
        return None  # Incomplete request

    header_section = raw[:header_end].decode("latin-1")
    body = raw[header_end + 4:]

    lines = header_section.split("\r\n")
    if not lines:
        return None

    # Parse the request line
    try:
        method, raw_path, http_version = lines[0].split(" ", 2)
    except ValueError:
        return None

    # Split path from query string
    if "?" in raw_path:
        path, query_string = raw_path.split("?", 1)
    else:
        path, query_string = raw_path, ""

    # Parse headers
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ": " in line:
            name, _, value = line.partition(": ")
            headers[name.lower()] = value

    # Trim body to Content-Length if present
    content_length = int(headers.get("content-length", len(body)))
    body = body[:content_length]

    return ParsedRequest(
        method=method,
        path=path,
        query_string=query_string,
        http_version=http_version,
        headers=headers,
        body=body,
    )

Building the environ Dictionary

The spec defines exactly what keys environ must contain. Here’s the conversion from our parsed request:

import os
import sys


def build_environ(request: ParsedRequest, server_name: str, server_port: int) -> dict:
    """
    Build a WSGI environ dict from a parsed HTTP request.
    See PEP 3333 for the complete specification.
    """
    environ = {
        # CGI variables
        "REQUEST_METHOD": request.method,
        "SCRIPT_NAME": "",
        "PATH_INFO": request.path,
        "QUERY_STRING": request.query_string,
        "SERVER_NAME": server_name,
        "SERVER_PORT": str(server_port),
        "SERVER_PROTOCOL": request.http_version,
        "GATEWAY_INTERFACE": "CGI/1.1",

        # WSGI variables
        "wsgi.version": (1, 0),
        "wsgi.url_scheme": "http",
        "wsgi.input": io.BytesIO(request.body),
        "wsgi.errors": sys.stderr,
        "wsgi.multithread": False,
        "wsgi.multiprocess": False,
        "wsgi.run_once": False,
    }

    # Content-Type and Content-Length: no HTTP_ prefix (CGI convention)
    if "content-type" in request.headers:
        environ["CONTENT_TYPE"] = request.headers["content-type"]
    else:
        environ["CONTENT_TYPE"] = ""

    if "content-length" in request.headers:
        environ["CONTENT_LENGTH"] = request.headers["content-length"]
    else:
        environ["CONTENT_LENGTH"] = ""

    # All other headers: HTTP_ prefix, uppercased, hyphens → underscores
    for name, value in request.headers.items():
        if name in ("content-type", "content-length"):
            continue
        key = "HTTP_" + name.upper().replace("-", "_")
        environ[key] = value

    return environ

The Response Builder

def build_response(status: str, headers: List[Tuple[str, str]], body: bytes) -> bytes:
    """Serialize a WSGI response to HTTP bytes."""
    lines = [f"HTTP/1.1 {status}\r\n"]
    for name, value in headers:
        lines.append(f"{name}: {value}\r\n")
    lines.append("\r\n")
    return "".join(lines).encode("latin-1") + body

The Server

Now we put it all together:

import socket
import sys
from typing import Callable


def serve(app: Callable, host: str = "127.0.0.1", port: int = 8000) -> None:
    """
    A minimal single-threaded WSGI server.
    Handles one request at a time. Not suitable for production,
    but suitable for understanding what production servers do.
    """
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Allow reusing the address immediately after restart
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    server_socket.bind((host, port))
    server_socket.listen(5)

    print(f"Serving on http://{host}:{port}", file=sys.stderr)

    while True:
        try:
            conn, addr = server_socket.accept()
            handle_connection(conn, addr, app, host, port)
        except KeyboardInterrupt:
            print("\nShutting down.", file=sys.stderr)
            break

    server_socket.close()


def handle_connection(
    conn: socket.socket,
    addr: tuple,
    app: Callable,
    server_name: str,
    server_port: int,
) -> None:
    """Handle a single HTTP connection."""
    try:
        # Read the full request
        raw = receive_request(conn)
        if not raw:
            return

        # Parse it
        request = parse_request(raw)
        if request is None:
            conn.sendall(b"HTTP/1.1 400 Bad Request\r\n\r\n")
            return

        # Build environ and call the app
        environ = build_environ(request, server_name, server_port)
        status, headers, body = call_app(app, environ)

        # Send the response
        response = build_response(status, headers, body)
        conn.sendall(response)

    except Exception as e:
        print(f"Error handling request: {e}", file=sys.stderr)
        try:
            conn.sendall(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
        except Exception:
            pass
    finally:
        conn.close()


def receive_request(conn: socket.socket) -> bytes:
    """
    Read bytes from a socket until we have a complete HTTP request.
    Handles the header/body split correctly.
    """
    data = b""
    conn.settimeout(5.0)

    try:
        # Read until we have the full headers
        while b"\r\n\r\n" not in data:
            chunk = conn.recv(4096)
            if not chunk:
                return data
            data += chunk

        # Find the end of headers
        header_end = data.find(b"\r\n\r\n") + 4

        # Determine Content-Length from headers
        header_section = data[:header_end].decode("latin-1")
        content_length = 0
        for line in header_section.split("\r\n"):
            if line.lower().startswith("content-length:"):
                try:
                    content_length = int(line.split(":", 1)[1].strip())
                except ValueError:
                    pass
                break

        # Read the body if needed
        body_received = len(data) - header_end
        while body_received < content_length:
            chunk = conn.recv(4096)
            if not chunk:
                break
            data += chunk
            body_received += len(chunk)

    except socket.timeout:
        pass  # Return whatever we have

    return data


def call_app(app: Callable, environ: dict) -> tuple:
    """
    Call a WSGI application and collect the response.
    Returns (status, headers, body_bytes).
    """
    response_args = []

    def start_response(status, headers, exc_info=None):
        if exc_info:
            try:
                if response_args:
                    raise exc_info[1].with_traceback(exc_info[2])
            finally:
                exc_info = None
        response_args.clear()
        response_args.append((status, headers))

    result = app(environ, start_response)

    try:
        body = b"".join(result)
    finally:
        if hasattr(result, "close"):
            result.close()

    if not response_args:
        raise RuntimeError("WSGI app did not call start_response")

    status, headers = response_args[0]
    return status, headers, body

Putting It All Together

# wsgi_server.py — the complete server in one file

import io
import os
import socket
import sys
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple


@dataclass
class ParsedRequest:
    method: str
    path: str
    query_string: str
    http_version: str
    headers: Dict[str, str]
    body: bytes


def parse_request(raw: bytes) -> Optional[ParsedRequest]:
    header_end = raw.find(b"\r\n\r\n")
    if header_end == -1:
        return None
    header_section = raw[:header_end].decode("latin-1")
    body = raw[header_end + 4:]
    lines = header_section.split("\r\n")
    try:
        method, raw_path, http_version = lines[0].split(" ", 2)
    except ValueError:
        return None
    path, query_string = (raw_path.split("?", 1) if "?" in raw_path
                          else (raw_path, ""))
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ": " in line:
            name, _, value = line.partition(": ")
            headers[name.lower()] = value
    content_length = int(headers.get("content-length", len(body)))
    return ParsedRequest(method, path, query_string, http_version,
                         headers, body[:content_length])


def build_environ(req: ParsedRequest, host: str, port: int) -> dict:
    env = {
        "REQUEST_METHOD": req.method,
        "SCRIPT_NAME": "",
        "PATH_INFO": req.path,
        "QUERY_STRING": req.query_string,
        "SERVER_NAME": host,
        "SERVER_PORT": str(port),
        "SERVER_PROTOCOL": req.http_version,
        "GATEWAY_INTERFACE": "CGI/1.1",
        "wsgi.version": (1, 0),
        "wsgi.url_scheme": "http",
        "wsgi.input": io.BytesIO(req.body),
        "wsgi.errors": sys.stderr,
        "wsgi.multithread": False,
        "wsgi.multiprocess": False,
        "wsgi.run_once": False,
        "CONTENT_TYPE": req.headers.get("content-type", ""),
        "CONTENT_LENGTH": req.headers.get("content-length", ""),
    }
    for name, value in req.headers.items():
        if name not in ("content-type", "content-length"):
            env["HTTP_" + name.upper().replace("-", "_")] = value
    return env


def build_response(status: str, headers: List[Tuple[str, str]],
                   body: bytes) -> bytes:
    lines = [f"HTTP/1.1 {status}\r\n"]
    for name, value in headers:
        lines.append(f"{name}: {value}\r\n")
    lines.append("\r\n")
    return "".join(lines).encode("latin-1") + body


def receive_request(conn: socket.socket) -> bytes:
    data = b""
    conn.settimeout(5.0)
    try:
        while b"\r\n\r\n" not in data:
            chunk = conn.recv(4096)
            if not chunk:
                return data
            data += chunk
        header_end = data.find(b"\r\n\r\n") + 4
        content_length = 0
        for line in data[:header_end].decode("latin-1").split("\r\n"):
            if line.lower().startswith("content-length:"):
                try:
                    content_length = int(line.split(":", 1)[1].strip())
                except ValueError:
                    pass
                break
        while len(data) - header_end < content_length:
            chunk = conn.recv(4096)
            if not chunk:
                break
            data += chunk
    except socket.timeout:
        pass
    return data


def call_app(app: Callable, environ: dict) -> tuple:
    response_args = []

    def start_response(status, headers, exc_info=None):
        if exc_info:
            try:
                if response_args:
                    raise exc_info[1].with_traceback(exc_info[2])
            finally:
                exc_info = None
        response_args.clear()
        response_args.append((status, headers))

    result = app(environ, start_response)
    try:
        body = b"".join(result)
    finally:
        if hasattr(result, "close"):
            result.close()
    if not response_args:
        raise RuntimeError("App did not call start_response")
    status, headers = response_args[0]
    return status, headers, body


def serve(app: Callable, host: str = "127.0.0.1", port: int = 8000) -> None:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen(5)
    print(f"Serving on http://{host}:{port}", file=sys.stderr)
    try:
        while True:
            conn, addr = sock.accept()
            try:
                raw = receive_request(conn)
                if not raw:
                    continue
                req = parse_request(raw)
                if req is None:
                    conn.sendall(b"HTTP/1.1 400 Bad Request\r\n\r\n")
                    continue
                environ = build_environ(req, host, port)
                status, headers, body = call_app(app, environ)
                conn.sendall(build_response(status, headers, body))
            except Exception as e:
                print(f"Error: {e}", file=sys.stderr)
                try:
                    conn.sendall(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
                except Exception:
                    pass
            finally:
                conn.close()
    except KeyboardInterrupt:
        print("\nShutting down.", file=sys.stderr)
    finally:
        sock.close()


# --- Test application ---

def hello_app(environ, start_response):
    name = environ.get("QUERY_STRING", "").split("=")[-1] or "world"
    body = f"Hello, {name}!\n".encode("utf-8")
    start_response("200 OK", [
        ("Content-Type", "text/plain"),
        ("Content-Length", str(len(body))),
    ])
    return [body]


if __name__ == "__main__":
    serve(hello_app)

Save as wsgi_server.py and test it:

python wsgi_server.py &
curl "http://127.0.0.1:8000/?name=WSGI"
# Hello, WSGI!

Now point it at the tasks app from the previous chapter:

# at the bottom of wsgi_server.py, replace the __main__ block:
if __name__ == "__main__":
    from tasks_app import application
    serve(application)
curl -X POST http://127.0.0.1:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "Works on my server"}'

What Gunicorn Does That We Don’t

Our server handles one request at a time. Gunicorn has workers — multiple processes or threads that each run the request loop above simultaneously. The sync worker is essentially what we’ve built, replicated across N processes. The gthread worker uses threads within each process. The gevent worker uses greenlets.

Beyond concurrency, Gunicorn adds:

  • HTTP/1.1 keep-alive: reusing connections for multiple requests
  • Chunked transfer encoding: streaming responses without known Content-Length
  • Request size limits: protecting against large payloads
  • Graceful worker restarts: new workers start before old ones die
  • Worker timeout killing: killing workers that hang

But the core loop — accept, parse, environ, call, serialize, send — is exactly what we’ve built.

The Latin-1 Encoding Detail

You may have noticed we decode HTTP headers as latin-1 rather than utf-8. This is intentional.

The HTTP spec (RFC 7230) says header field names and values are ISO-8859-1 (latin-1) by default. This is a historical artifact from HTTP/1.0. Headers containing non-ASCII characters are technically not valid in HTTP/1.1 (they should be encoded, e.g., RFC 5987 encoding for Content-Disposition filenames).

In practice, Python’s WSGI servers use latin-1 for headers to preserve byte-for-byte fidelity. If you round-trip a header through encode('latin-1').decode('latin-1'), you get the original bytes back — which matters for the WSGI spec’s requirement that environ values be native strings.

This is one of those details that doesn’t matter until it does, at which point you’ll spend a day debugging mysterious encoding errors.

The SO_REUSEADDR Detail

server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

Without this, if you restart the server quickly after stopping it, you’ll get [Errno 98] Address already in use. This happens because the OS keeps the socket in TIME_WAIT state for a while after close, waiting for any stray packets that might still be in transit. SO_REUSEADDR tells the OS to let us reuse the address anyway.

Gunicorn sets this too. So does every other production server. It’s the first thing you add after “it works once” breaks down.

Middleware: Turtles All the Way Down

Middleware is the part of Python web development that sounds complicated until you understand it, at which point it becomes almost disappointingly simple.

A WSGI middleware is a callable that:

  1. Takes a WSGI application as its argument
  2. Returns a WSGI application

That’s the whole definition. It’s a function that wraps a function to produce a function. Python has a name for this: a decorator.

The Simplest Possible Middleware

def do_nothing_middleware(app):
    """Middleware that does absolutely nothing."""
    def wrapper(environ, start_response):
        return app(environ, start_response)
    return wrapper

Use it:

def my_app(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    return [b"Hello"]

wrapped = do_nothing_middleware(my_app)
# wrapped is a WSGI app. Gunicorn can't tell the difference.

wrapped is indistinguishable from my_app from the server’s perspective. It takes environ and start_response, calls start_response, and returns a body iterable. It’s a valid WSGI app.

Now add some behavior:

Request Logging Middleware

import sys
import time


def logging_middleware(app):
    """Log method, path, status, and timing for every request."""
    def wrapper(environ, start_response):
        method = environ.get("REQUEST_METHOD", "?")
        path = environ.get("PATH_INFO", "/")
        started = time.monotonic()

        # Intercept start_response to capture the status code
        status_holder = []

        def capturing_start_response(status, headers, exc_info=None):
            status_holder.append(status)
            return start_response(status, headers, exc_info)

        result = app(environ, capturing_start_response)

        elapsed = (time.monotonic() - started) * 1000
        status = status_holder[0] if status_holder else "???"
        print(f"{method} {path} {status} ({elapsed:.1f}ms)", file=sys.stderr)

        return result

    return wrapper

The key insight: middleware can intercept start_response to inspect or modify the status and headers before passing them to the real start_response. This is how authentication middleware rejects requests with 401 Unauthorized, how compression middleware adds Content-Encoding: gzip, and how CORS middleware adds Access-Control-Allow-Origin headers.

Authentication Middleware

import base64


def basic_auth_middleware(app, username: str, password: str):
    """
    HTTP Basic Authentication middleware.
    Rejects requests without valid credentials with 401.
    """
    # Pre-compute the expected auth header value
    credentials = f"{username}:{password}".encode("utf-8")
    expected = "Basic " + base64.b64encode(credentials).decode("ascii")

    def wrapper(environ, start_response):
        auth = environ.get("HTTP_AUTHORIZATION", "")

        if auth != expected:
            body = b"Unauthorized"
            start_response("401 Unauthorized", [
                ("Content-Type", "text/plain"),
                ("Content-Length", str(len(body))),
                ("WWW-Authenticate", 'Basic realm="Protected"'),
            ])
            return [body]

        # Credentials valid — call the real app
        return app(environ, start_response)

    return wrapper


# Usage:
protected_app = basic_auth_middleware(my_app, "admin", "secret")

Notice what happened: basic_auth_middleware takes the app and the credentials. It returns a closure. The closure has access to both app and expected via Python’s closure mechanism.

Stacking Middleware

Here’s where the “turtles all the way down” name comes from. You can stack middleware:

app = my_app
app = basic_auth_middleware(app, "admin", "secret")
app = logging_middleware(app)

When a request comes in, app is now the logging middleware. It calls basic_auth_middleware’s wrapper. That calls the real my_app. The stack unwinds on the way back.

The call stack during a request looks like:

logging wrapper
    → basic_auth wrapper
        → my_app
        ← response
    ← response (with status logged)
← response

This is exactly what Django’s MIDDLEWARE setting builds. Each class in the list wraps the next.

A Composable Middleware Builder

Rather than manually nesting, build a pipeline function:

from typing import Callable, List


def build_middleware_stack(
    app: Callable,
    middleware: List[Callable],
) -> Callable:
    """
    Apply middleware in order, outermost last.
    middleware = [A, B, C] means: A wraps B wraps C wraps app
    Execution order: A → B → C → app → C → B → A
    """
    for mw in reversed(middleware):
        app = mw(app)
    return app


# Usage
stack = build_middleware_stack(my_app, [
    logging_middleware,
    basic_auth_middleware,  # This would need partial application
])

For middleware that takes configuration, use functools.partial or a factory function:

import functools

stack = build_middleware_stack(my_app, [
    logging_middleware,
    functools.partial(basic_auth_middleware, username="admin", password="secret"),
])

CORS Middleware

A real-world example: Cross-Origin Resource Sharing middleware adds the headers that browsers need for cross-origin requests.

from typing import Optional


def cors_middleware(
    app,
    allow_origins: List[str] = ["*"],
    allow_methods: List[str] = ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
    allow_headers: List[str] = ["Content-Type", "Authorization"],
    max_age: int = 86400,
):
    """
    CORS middleware. Adds Access-Control-* headers to all responses
    and handles OPTIONS preflight requests.
    """
    origin_header = ", ".join(allow_origins)
    methods_header = ", ".join(allow_methods)
    headers_header = ", ".join(allow_headers)

    cors_headers = [
        ("Access-Control-Allow-Origin", origin_header),
        ("Access-Control-Allow-Methods", methods_header),
        ("Access-Control-Allow-Headers", headers_header),
        ("Access-Control-Max-Age", str(max_age)),
    ]

    def wrapper(environ, start_response):
        # Handle CORS preflight
        if environ["REQUEST_METHOD"] == "OPTIONS":
            body = b""
            start_response("204 No Content", cors_headers + [
                ("Content-Length", "0"),
            ])
            return [body]

        # Inject CORS headers into every response
        def cors_start_response(status, headers, exc_info=None):
            return start_response(status, headers + cors_headers, exc_info)

        return app(environ, cors_start_response)

    return wrapper

Request ID Middleware

Add a unique ID to every request — useful for correlating log lines across multiple services:

import uuid


def request_id_middleware(app, header_name: str = "X-Request-ID"):
    """
    Assign a unique request ID to every incoming request.
    Uses the incoming header if present, generates one otherwise.
    Injects the ID into environ and adds it to the response headers.
    """
    environ_key = "HTTP_" + header_name.upper().replace("-", "_")

    def wrapper(environ, start_response):
        # Use existing ID or generate a new one
        request_id = environ.get(environ_key) or str(uuid.uuid4())
        environ[environ_key] = request_id
        environ["request_id"] = request_id  # Convenience key

        def id_start_response(status, headers, exc_info=None):
            return start_response(
                status,
                headers + [(header_name, request_id)],
                exc_info,
            )

        return app(environ, id_start_response)

    return wrapper

Response Timing Header

import time


def timing_middleware(app):
    """Add X-Response-Time header (milliseconds) to every response."""
    def wrapper(environ, start_response):
        started = time.monotonic()

        def timing_start_response(status, headers, exc_info=None):
            elapsed_ms = (time.monotonic() - started) * 1000
            return start_response(
                status,
                headers + [("X-Response-Time", f"{elapsed_ms:.2f}ms")],
                exc_info,
            )

        return app(environ, timing_start_response)

    return wrapper

Putting the Stack Together

# app.py
import functools

from tasks_app import application as tasks_app

app = build_middleware_stack(tasks_app, [
    timing_middleware,
    logging_middleware,
    request_id_middleware,
    functools.partial(cors_middleware, allow_origins=["https://myapp.com"]),
    functools.partial(basic_auth_middleware, username="admin", password="hunter2"),
])

# Run with: gunicorn app:app

The call order is: timing → logging → request_id → cors → basic_auth → tasks_app.

Each middleware layer handles one concern. None of them know about each other. They’re composable because they all speak the same interface: WSGI.

The Class-Based Pattern

You can also write middleware as classes, which is how Django does it:

class LoggingMiddleware:
    def __init__(self, app):
        self.app = app

    def __call__(self, environ, start_response):
        method = environ.get("REQUEST_METHOD", "?")
        path = environ.get("PATH_INFO", "/")
        print(f"→ {method} {path}", file=sys.stderr)
        return self.app(environ, start_response)

The class is callable (via __call__), so LoggingMiddleware(my_app) returns a WSGI app. The behavior is identical to the function-based approach — Django’s preference for classes is a stylistic choice, not a technical requirement.

What Frameworks Add

When Flask does:

from flask_cors import CORS
CORS(app)

It’s wrapping app.wsgi_app with a CORS middleware. Not magic — WSGI middleware composition.

When Django processes your MIDDLEWARE list, it builds a chain of callables where each one wraps the next. The “middleware interface” Django defines (with process_request, process_response, process_view) is just a more structured way to write the same wrapping pattern.

The underlying mechanism is always: callables all the way down.

Routing Without a Framework (It’s Just String Matching)

URL routing sounds like one of those problems that requires a framework. It doesn’t. Routing is pattern matching on a string. The string is PATH_INFO. The patterns are either exact matches, prefix matches, or regular expressions. Everything else is sugar.

Let’s build a router that would be genuinely usable in a small project.

What a Router Does

A router maps incoming requests (method + path) to handler functions. Given:

GET /users/42/posts

It should find the handler registered for GET /users/{user_id}/posts and call it with user_id="42".

The three parts of routing:

  1. Registration: associate a pattern with a handler
  2. Matching: find the pattern that matches the incoming path
  3. Extraction: pull path parameters out of the matched URL

Naive Routing: Just if Statements

We already saw this in the tasks app. It scales to about 5 routes before it becomes unpleasant:

def application(environ, start_response):
    method = environ['REQUEST_METHOD']
    path = environ['PATH_INFO']

    if path == '/' and method == 'GET':
        return index(environ, start_response)
    if path == '/users' and method == 'GET':
        return list_users(environ, start_response)
    # ... this gets old fast

Let’s do better.

A Dict-Based Exact Router

For applications that only need exact path matching (no parameters), a dict is fast and readable:

from typing import Callable, Dict, Optional, Tuple


# Type: maps (method, path) to handler
RouteTable = Dict[Tuple[str, str], Callable]


def make_exact_router(routes: RouteTable) -> Callable:
    """
    Build a WSGI app from a dict of (method, path) → handler mappings.
    Returns 404 for unmatched paths, 405 for wrong method on matched path.
    """
    # Build a set of known paths for 404 vs 405 distinction
    known_paths = {path for (_, path) in routes}

    def router(environ, start_response):
        method = environ['REQUEST_METHOD']
        path = environ['PATH_INFO']
        key = (method, path)

        if key in routes:
            return routes[key](environ, start_response)

        if path in known_paths:
            # Path exists but method is wrong
            allowed = [m for (m, p) in routes if p == path]
            start_response("405 Method Not Allowed", [
                ("Allow", ", ".join(allowed)),
                ("Content-Type", "text/plain"),
            ])
            return [b"Method Not Allowed"]

        start_response("404 Not Found", [("Content-Type", "text/plain")])
        return [b"Not Found"]

    return router


# Usage
def index(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    return [b"Home"]

def about(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    return [b"About"]

app = make_exact_router({
    ("GET", "/"): index,
    ("GET", "/about"): about,
})

A Pattern Router with Path Parameters

For path parameters like /users/{user_id}, we need pattern matching. We’ll convert path templates to regular expressions:

import re
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple


@dataclass
class Route:
    method: str
    pattern: re.Pattern
    handler: Callable
    param_names: List[str]


def compile_path(path_template: str) -> Tuple[re.Pattern, List[str]]:
    """
    Convert a path template like '/users/{user_id}/posts/{post_id}'
    to a compiled regex and a list of parameter names.

    Supported converters:
        {name}      → matches any non-slash characters
        {name:int}  → matches one or more digits
        {name:slug} → matches URL-safe characters (letters, digits, hyphens)
    """
    converters = {
        "str": r"[^/]+",
        "int": r"[0-9]+",
        "slug": r"[a-zA-Z0-9-]+",
    }

    param_names = []
    pattern = ""
    remaining = path_template

    for match in re.finditer(r"\{(\w+)(?::(\w+))?\}", path_template):
        # Add the literal part before this parameter
        start = match.start()
        pattern += re.escape(remaining[:start - (len(path_template) - len(remaining))])

        name = match.group(1)
        converter = match.group(2) or "str"
        param_names.append(name)
        pattern += f"(?P<{name}>{converters.get(converter, converters['str'])})"

        remaining = path_template[match.end():]

    # A cleaner approach using re.sub:
    param_names = []

    def replace_param(m):
        name = m.group(1)
        converter = m.group(2) or "str"
        param_names.append(name)
        return f"(?P<{name}>{converters.get(converter, converters['str'])})"

    regex_str = re.sub(r"\{(\w+)(?::(\w+))?\}", replace_param, path_template)
    regex_str = "^" + regex_str + "$"

    return re.compile(regex_str), param_names


class Router:
    """
    A WSGI router with path parameter support.

    Usage:
        router = Router()

        @router.route("GET", "/users/{user_id:int}")
        def get_user(environ, start_response):
            user_id = int(environ['route.params']['user_id'])
            ...
    """

    def __init__(self):
        self.routes: List[Route] = []

    def route(self, method: str, path: str):
        """Decorator to register a route handler."""
        def decorator(func: Callable) -> Callable:
            pattern, param_names = compile_path(path)
            self.routes.append(Route(
                method=method.upper(),
                pattern=pattern,
                handler=func,
                param_names=param_names,
            ))
            return func
        return decorator

    # Convenience methods
    def get(self, path: str):
        return self.route("GET", path)

    def post(self, path: str):
        return self.route("POST", path)

    def put(self, path: str):
        return self.route("PUT", path)

    def delete(self, path: str):
        return self.route("DELETE", path)

    def __call__(self, environ, start_response):
        """The router itself is a WSGI app."""
        method = environ['REQUEST_METHOD']
        path = environ['PATH_INFO']

        matched_routes = []

        for route in self.routes:
            match = route.pattern.match(path)
            if match:
                matched_routes.append((route, match))

        if not matched_routes:
            start_response("404 Not Found", [("Content-Type", "text/plain")])
            return [b"Not Found"]

        # Check if any match the method
        for route, match in matched_routes:
            if route.method == method:
                # Inject path params into environ
                environ['route.params'] = match.groupdict()
                return route.handler(environ, start_response)

        # Path matched but method didn't
        allowed = [r.method for r, _ in matched_routes]
        start_response("405 Method Not Allowed", [
            ("Allow", ", ".join(sorted(set(allowed)))),
            ("Content-Type", "text/plain"),
        ])
        return [b"Method Not Allowed"]

Using the Router

import json
import uuid

router = Router()
tasks = {}  # In-memory store


def json_response(start_response, data, status=200):
    phrases = {200: "OK", 201: "Created", 400: "Bad Request",
               404: "Not Found", 405: "Method Not Allowed"}
    body = json.dumps(data).encode("utf-8")
    start_response(f"{status} {phrases.get(status, 'Unknown')}", [
        ("Content-Type", "application/json"),
        ("Content-Length", str(len(body))),
    ])
    return [body]


def read_json_body(environ):
    try:
        length = int(environ.get("CONTENT_LENGTH") or 0)
    except ValueError:
        return None
    if not length:
        return None
    try:
        return json.loads(environ["wsgi.input"].read(length))
    except (json.JSONDecodeError, KeyError):
        return None


@router.get("/tasks")
def list_tasks(environ, start_response):
    return json_response(start_response, list(tasks.values()))


@router.post("/tasks")
def create_task(environ, start_response):
    data = read_json_body(environ)
    if not data or "title" not in data:
        return json_response(start_response, {"error": "title required"}, 400)
    task = {"id": str(uuid.uuid4()), "title": data["title"], "done": False}
    tasks[task["id"]] = task
    return json_response(start_response, task, 201)


@router.get("/tasks/{task_id}")
def get_task(environ, start_response):
    task_id = environ["route.params"]["task_id"]
    task = tasks.get(task_id)
    if task is None:
        return json_response(start_response, {"error": "not found"}, 404)
    return json_response(start_response, task)


@router.delete("/tasks/{task_id}")
def delete_task(environ, start_response):
    task_id = environ["route.params"]["task_id"]
    if task_id not in tasks:
        return json_response(start_response, {"error": "not found"}, 404)
    return json_response(start_response, tasks.pop(task_id))


@router.get("/users/{user_id:int}/tasks")
def user_tasks(environ, start_response):
    user_id = environ["route.params"]["user_id"]  # Already matched as digits
    # In a real app, filter by user_id
    return json_response(start_response, {
        "user_id": int(user_id),
        "tasks": list(tasks.values()),
    })


if __name__ == "__main__":
    from wsgiref.simple_server import make_server
    with make_server("127.0.0.1", 8000, router) as server:
        print("http://127.0.0.1:8000")
        server.serve_forever()

Test it:

curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "understand routing"}'

curl http://localhost:8000/users/42/tasks

Sub-Applications and Mounting

A more advanced pattern: route by path prefix and delegate to sub-applications.

class Mount:
    """
    Mount WSGI sub-applications at path prefixes.

        app = Mount({
            "/api": api_app,
            "/admin": admin_app,
            "/": fallback_app,
        })
    """

    def __init__(self, mounts: Dict[str, Callable]):
        # Sort by prefix length descending so more specific paths match first
        self.mounts = sorted(
            mounts.items(),
            key=lambda item: len(item[0]),
            reverse=True,
        )

    def __call__(self, environ, start_response):
        path = environ.get("PATH_INFO", "/")

        for prefix, app in self.mounts:
            if path.startswith(prefix):
                # Strip the prefix and adjust PATH_INFO/SCRIPT_NAME
                environ = dict(environ)
                environ["SCRIPT_NAME"] = environ.get("SCRIPT_NAME", "") + prefix
                environ["PATH_INFO"] = path[len(prefix):] or "/"
                return app(environ, start_response)

        start_response("404 Not Found", [("Content-Type", "text/plain")])
        return [b"Not Found"]


# Example: mount API and docs under different prefixes
from wsgiref.simple_server import demo_app

app = Mount({
    "/api": router,        # Our task API
    "/": demo_app,         # Fallback
})

What Flask’s Router Actually Does

Flask uses Werkzeug’s routing system, which is more sophisticated than ours — it handles URL encoding, trailing slash redirects, weighted route matching (so /users/me takes precedence over /users/{id}), and URL generation (reversing a route name to a URL).

But the core mechanism is the same: convert URL templates to regular expressions, match PATH_INFO against them, extract named groups as parameters. Werkzeug just handles more edge cases and has a more refined API for it.

FastAPI’s router is Starlette’s, which is built on the same principle. The @app.get("/items/{item_id}") decorator registers a pattern and a handler. When a request comes in, Starlette walks the route table looking for a match.

The Trailing Slash Question

Should /users/ and /users match the same route? Opinions vary. Flask defaults to treating them as different routes (and optionally redirecting one to the other). Django’s path() function matches literally.

Our router matches literally too — {path_template} must exactly match, no trailing slash normalization. If you want both to work, register both:

@router.get("/tasks")
@router.get("/tasks/")
def list_tasks(environ, start_response):
    ...

Or add a middleware that strips trailing slashes:

def strip_trailing_slash(app):
    def wrapper(environ, start_response):
        path = environ.get("PATH_INFO", "/")
        if path != "/" and path.endswith("/"):
            environ = dict(environ)
            environ["PATH_INFO"] = path.rstrip("/")
        return app(environ, start_response)
    return wrapper

Routing is opinionated. Now you understand the opinions well enough to choose your own.

Request and Response Objects (DIY Edition)

Working directly with environ and start_response gets old. Not because they’re wrong — they’re a fine low-level interface — but because nobody wants to write environ.get('HTTP_AUTHORIZATION', '').split(' ')[-1] every time they need a Bearer token.

Request and Response objects are wrappers. They take the raw WSGI interface and present it through a more convenient API. In this chapter, we’ll build both.

The Request Object

What do we actually want from a request object? Looking at how we’ve been using environ:

# Things we access constantly:
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
query_string = environ['QUERY_STRING']
content_type = environ.get('CONTENT_TYPE', '')
body = environ['wsgi.input'].read(int(environ.get('CONTENT_LENGTH') or 0))
host = environ.get('HTTP_HOST', '')
auth = environ.get('HTTP_AUTHORIZATION', '')

# Things we compute repeatedly:
params = urllib.parse.parse_qs(query_string)
json_body = json.loads(body)

Let’s wrap all of this:

import json
import urllib.parse
from typing import Any, Dict, List, Optional


class Request:
    """
    A wrapper around a WSGI environ dict.
    Provides convenient access to request data.
    """

    def __init__(self, environ: dict):
        self._environ = environ
        self._body: Optional[bytes] = None  # Cached, body can only be read once

    # ── Basic properties ──────────────────────────────────────────────────

    @property
    def method(self) -> str:
        return self._environ['REQUEST_METHOD'].upper()

    @property
    def path(self) -> str:
        return self._environ.get('PATH_INFO', '/')

    @property
    def query_string(self) -> str:
        return self._environ.get('QUERY_STRING', '')

    @property
    def scheme(self) -> str:
        return self._environ.get('wsgi.url_scheme', 'http')

    @property
    def host(self) -> str:
        return self._environ.get('HTTP_HOST', self._environ.get('SERVER_NAME', ''))

    @property
    def url(self) -> str:
        """The full request URL."""
        url = f"{self.scheme}://{self.host}{self.path}"
        if self.query_string:
            url += f"?{self.query_string}"
        return url

    # ── Headers ───────────────────────────────────────────────────────────

    def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
        """
        Get a request header by name (case-insensitive).
        Handles the HTTP_ prefix and Content-Type/Content-Length exceptions.
        """
        name_lower = name.lower()
        if name_lower in ('content-type', 'content-length'):
            key = name_lower.replace('-', '_').upper()
        else:
            key = 'HTTP_' + name_lower.replace('-', '_').upper()
        return self._environ.get(key, default)

    @property
    def content_type(self) -> str:
        return self._environ.get('CONTENT_TYPE', '')

    @property
    def content_length(self) -> int:
        try:
            return int(self._environ.get('CONTENT_LENGTH') or 0)
        except ValueError:
            return 0

    @property
    def authorization(self) -> Optional[str]:
        return self.get_header('Authorization')

    @property
    def bearer_token(self) -> Optional[str]:
        """Extract Bearer token from Authorization header."""
        auth = self.authorization
        if auth and auth.startswith('Bearer '):
            return auth[7:]
        return None

    # ── Query parameters ──────────────────────────────────────────────────

    @property
    def query_params(self) -> Dict[str, List[str]]:
        """Parsed query string as dict of name → list of values."""
        return urllib.parse.parse_qs(self.query_string, keep_blank_values=True)

    def query(self, name: str, default: Optional[str] = None) -> Optional[str]:
        """Get the first value of a query parameter."""
        values = self.query_params.get(name)
        return values[0] if values else default

    def query_list(self, name: str) -> List[str]:
        """Get all values of a query parameter."""
        return self.query_params.get(name, [])

    # ── Body ──────────────────────────────────────────────────────────────

    @property
    def body(self) -> bytes:
        """Read and cache the request body."""
        if self._body is None:
            if self.content_length > 0:
                self._body = self._environ['wsgi.input'].read(self.content_length)
            else:
                self._body = b''
        return self._body

    @property
    def text(self) -> str:
        """Request body decoded as UTF-8 text."""
        return self.body.decode('utf-8')

    def json(self) -> Any:
        """Parse request body as JSON. Raises ValueError on failure."""
        return json.loads(self.body)

    def form(self) -> Dict[str, List[str]]:
        """Parse application/x-www-form-urlencoded body."""
        return urllib.parse.parse_qs(self.body.decode('utf-8'), keep_blank_values=True)

    # ── Path parameters (set by router) ───────────────────────────────────

    @property
    def path_params(self) -> Dict[str, str]:
        """Path parameters extracted by the router."""
        return self._environ.get('route.params', {})

    def path_param(self, name: str, default: Optional[str] = None) -> Optional[str]:
        return self.path_params.get(name, default)

    # ── Convenience ───────────────────────────────────────────────────────

    @property
    def is_json(self) -> bool:
        return 'application/json' in self.content_type

    @property
    def is_form(self) -> bool:
        return 'application/x-www-form-urlencoded' in self.content_type

    def __repr__(self) -> str:
        return f"<Request {self.method} {self.path}>"

The Response Object

The response side is about building the thing we return from WSGI handlers. The raw interface requires:

  1. Calling start_response(status, headers)
  2. Returning an iterable of bytes

A Response object collects status, headers, and body, then handles the WSGI mechanics:

from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union


class Response:
    """
    A WSGI response builder.

    Usage:
        def handler(environ, start_response):
            response = Response("Hello, world!", content_type="text/plain")
            return response(environ, start_response)
    """

    def __init__(
        self,
        body: Union[str, bytes, None] = None,
        status: int = 200,
        content_type: str = "text/plain; charset=utf-8",
        headers: Optional[Dict[str, str]] = None,
    ):
        self.status_code = status
        self.headers: Dict[str, str] = {"Content-Type": content_type}
        if headers:
            self.headers.update(headers)

        if body is None:
            self._body = b""
        elif isinstance(body, str):
            self._body = body.encode("utf-8")
        else:
            self._body = body

    @property
    def status_line(self) -> str:
        phrases = {
            200: "OK", 201: "Created", 204: "No Content",
            301: "Moved Permanently", 302: "Found", 304: "Not Modified",
            400: "Bad Request", 401: "Unauthorized", 403: "Forbidden",
            404: "Not Found", 405: "Method Not Allowed",
            409: "Conflict", 415: "Unsupported Media Type",
            422: "Unprocessable Entity",
            500: "Internal Server Error", 503: "Service Unavailable",
        }
        phrase = phrases.get(self.status_code, "Unknown")
        return f"{self.status_code} {phrase}"

    def set_header(self, name: str, value: str) -> "Response":
        self.headers[name] = value
        return self

    def set_cookie(
        self,
        name: str,
        value: str,
        max_age: Optional[int] = None,
        path: str = "/",
        http_only: bool = True,
        secure: bool = False,
        same_site: str = "Lax",
    ) -> "Response":
        cookie = f"{name}={value}; Path={path}; SameSite={same_site}"
        if max_age is not None:
            cookie += f"; Max-Age={max_age}"
        if http_only:
            cookie += "; HttpOnly"
        if secure:
            cookie += "; Secure"
        # Multiple Set-Cookie headers — requires special handling
        self.headers.setdefault("_cookies", "")
        self._cookies = getattr(self, "_cookies", [])
        self._cookies.append(cookie)
        return self

    def __call__(self, environ: dict, start_response: Callable) -> List[bytes]:
        """Make Response a WSGI-compatible callable."""
        body = self._body
        headers = list(self.headers.items())

        # Add Content-Length if not present
        if "Content-Length" not in self.headers:
            headers.append(("Content-Length", str(len(body))))

        # Handle multiple Set-Cookie headers
        for cookie in getattr(self, "_cookies", []):
            headers.append(("Set-Cookie", cookie))

        start_response(self.status_line, headers)
        return [body] if body else []


# ── Factory functions ─────────────────────────────────────────────────────────

class JSONResponse(Response):
    def __init__(self, data: Any, status: int = 200, **kwargs):
        body = json.dumps(data, default=str).encode("utf-8")
        super().__init__(
            body=body,
            status=status,
            content_type="application/json",
            **kwargs,
        )


class HTMLResponse(Response):
    def __init__(self, html: str, status: int = 200, **kwargs):
        super().__init__(
            body=html,
            status=status,
            content_type="text/html; charset=utf-8",
            **kwargs,
        )


class RedirectResponse(Response):
    def __init__(self, location: str, permanent: bool = False):
        super().__init__(
            status=301 if permanent else 302,
            headers={"Location": location},
        )


class EmptyResponse(Response):
    def __init__(self, status: int = 204):
        super().__init__(status=status)

Adapting the Router to Use Request/Response

Now let’s update the router from the previous chapter to work with our new objects. We’ll add a thin adapter that converts between the WSGI interface and Request/Response objects:

from typing import Callable


HandlerFunc = Callable[[Request], Response]


def wsgi_handler(func: HandlerFunc) -> Callable:
    """
    Decorator: wraps a Request→Response function into a WSGI handler.
    Injects a Request object and calls the Response.
    """
    def wrapper(environ: dict, start_response: Callable) -> list:
        request = Request(environ)
        response = func(request)
        return response(environ, start_response)
    return wrapper


# Now our route handlers look like:

router = Router()

@router.get("/tasks/{task_id}")
@wsgi_handler
def get_task(request: Request) -> Response:
    task_id = request.path_param("task_id")
    task = tasks.get(task_id)
    if task is None:
        return JSONResponse({"error": "not found"}, status=404)
    return JSONResponse(task)


@router.post("/tasks")
@wsgi_handler
def create_task(request: Request) -> Response:
    if not request.is_json:
        return JSONResponse({"error": "content-type must be application/json"}, 415)
    try:
        data = request.json()
    except ValueError:
        return JSONResponse({"error": "invalid JSON"}, 400)
    if "title" not in data:
        return JSONResponse({"error": "title is required"}, 400)
    task = {"id": str(uuid.uuid4()), "title": data["title"], "done": False}
    tasks[task["id"]] = task
    return JSONResponse(task, status=201)

Look at how readable this is compared to the raw WSGI version. Same underlying mechanism, much cleaner interface.

Testing With Request and Response Objects

One of the biggest benefits of wrapping the WSGI interface: testability. We can construct Request objects with arbitrary environ dicts and inspect Response objects directly.

# test_handlers.py
import io
import json


def make_request(
    method: str = "GET",
    path: str = "/",
    body: bytes = b"",
    content_type: str = "",
    headers: dict = None,
) -> Request:
    """Build a Request object for testing."""
    environ = {
        "REQUEST_METHOD": method,
        "PATH_INFO": path,
        "QUERY_STRING": "",
        "CONTENT_TYPE": content_type,
        "CONTENT_LENGTH": str(len(body)),
        "wsgi.input": io.BytesIO(body),
        "wsgi.errors": io.StringIO(),
        "wsgi.url_scheme": "http",
        "wsgi.version": (1, 0),
        "wsgi.multithread": False,
        "wsgi.multiprocess": False,
        "wsgi.run_once": False,
        "SERVER_NAME": "testserver",
        "SERVER_PORT": "80",
        "HTTP_HOST": "testserver",
    }
    if headers:
        for name, value in headers.items():
            key = "HTTP_" + name.upper().replace("-", "_")
            environ[key] = value
    return Request(environ)


def call_handler(handler, request: Request) -> Response:
    """Call a WSGI handler and return a Response-like object."""
    responses = []

    def start_response(status, headers, exc_info=None):
        responses.append((status, headers))

    body_chunks = handler(request._environ, start_response)
    body = b"".join(body_chunks)

    status_str, headers_list = responses[0]
    status_code = int(status_str.split(" ")[0])
    response = Response(body=body, status=status_code)
    for name, value in headers_list:
        response.set_header(name, value)
    return response


# Tests
def test_create_task():
    payload = json.dumps({"title": "test task"}).encode()
    request = make_request("POST", "/tasks", body=payload,
                           content_type="application/json")
    request._environ["route.params"] = {}

    response = call_handler(create_task.__wrapped__, request)
    # Note: __wrapped__ gets past our @wsgi_handler decorator

    assert response.status_code == 201
    data = json.loads(response._body)
    assert data["title"] == "test task"
    assert "id" in data
    assert data["done"] is False


def test_get_nonexistent_task():
    request = make_request("GET", "/tasks/nonexistent")
    request._environ["route.params"] = {"task_id": "nonexistent"}

    response = call_handler(get_task.__wrapped__, request)
    assert response.status_code == 404


if __name__ == "__main__":
    test_create_task()
    test_get_nonexistent_task()
    print("All tests passed.")

What Frameworks Add on Top

Django’s HttpRequest object does exactly what we’ve done, plus:

  • Multipart form parsing (file uploads)
  • Cookie parsing
  • Session integration (request.session)
  • User authentication (request.user)
  • Per-request caching

Flask’s Request (from Werkzeug) adds:

  • request.files for uploads
  • request.cookies
  • request.args (query params)
  • request.form with multi-dict behavior
  • JSON parsing with error handling

FastAPI’s request handling goes further: it automatically converts JSON bodies to Pydantic models based on your function’s type annotations.

But all of these are the same thing we built, with more edge cases handled and more convenience methods added. The foundation is identical: wrap environ in a nicer API, provide a Response class that speaks WSGI.

Closing the Loop

We now have all the pieces of a working web framework:

  • Server (previous chapter): accepts connections, calls WSGI apps
  • Router (previous chapter): dispatches to handlers based on method + path
  • Middleware (two chapters ago): composable wrappers for cross-cutting concerns
  • Request/Response (this chapter): a clean interface for handlers

In the patterns section, we’ll assemble these into a small but complete framework. First, we need to understand why WSGI has limits — and what ASGI does about them.

Why WSGI Can’t Have Nice Things

WSGI has been reliable for twenty years. It’s simple, well-understood, and supported by every Python web server ever written. So why does ASGI exist?

The short answer is: WebSockets. The longer answer involves a fundamental mismatch between the WSGI request/response model and how modern web applications actually behave.

The WSGI Model

WSGI assumes a specific shape for web communication:

Client sends request → Server parses it → Your app runs → App returns response → Done

This is the HTTP request/response cycle. One request in, one response out. The entire interaction fits in a single function call:

response_iterable = application(environ, start_response)

When application returns, the transaction is complete. The connection can be closed. The worker is free to handle the next request.

This model works perfectly for 95% of HTTP traffic. GET a page, POST a form, fetch some JSON. One request, one response, move on.

The remaining 5% is where things fall apart.

WebSockets: The Problem

WebSockets are a protocol for bidirectional communication over a persistent connection. Once a WebSocket handshake completes, both client and server can send messages at any time, for as long as the connection stays open. There’s no concept of “one request, one response.”

How would you model this in WSGI? You can’t. Let’s try anyway to see why:

def websocket_app(environ, start_response):
    # At this point we want to:
    # 1. Complete the WebSocket upgrade handshake
    # 2. Read messages from the client
    # 3. Send messages to the client
    # 4. Keep the connection open indefinitely
    # 5. React to messages as they arrive
    #
    # But start_response must be called synchronously.
    # And we must return an iterable.
    # And when we return, the connection closes.
    #
    # There is no way to model "keep connection open, react to events"
    # within this interface.

    start_response("101 Switching Protocols", [...])
    return ???  # What do we return? How do we keep reading?

The WSGI model is inherently synchronous and single-turn. WebSockets are inherently asynchronous and multi-turn.

Some WSGI servers tried to hack around this with extensions. None of them were portable. The WebSocket support in Flask before ASGI required either gevent (monkey-patching) or a separate WebSocket server running alongside the WSGI app, with a reverse proxy in front. It was not elegant.

Long Polling and Server-Sent Events

The same problem, milder:

Long polling: client sends request, server holds it open for up to 30 seconds until there’s data to send, then responds. Works in WSGI (barely), but ties up a worker thread for the entire wait time.

Server-Sent Events (SSE): server sends a stream of events to the client over a single HTTP connection. WSGI can technically do this by returning a generator, but the worker is tied up until the stream ends.

Both of these require holding a connection open while doing work asynchronously. WSGI’s synchronous model handles this with blocking — tie up a thread per connection. This works but doesn’t scale: 1000 concurrent SSE connections = 1000 blocked threads.

The Async Problem

Python 3.4 introduced asyncio. By 2016, async/await was mainstream Python. Web frameworks wanted to write async handlers:

async def get_user(request):
    user = await db.fetch_one("SELECT * FROM users WHERE id = ?", user_id)
    return JSONResponse(user)

This is genuinely better. The worker isn’t blocked while waiting for the database — it can handle other requests. But you can’t call an async function from a synchronous context without asyncio.run() or equivalent. And WSGI servers call your app synchronously:

# This is what Gunicorn does. It's synchronous.
result = application(environ, start_response)

You can’t stick await in there. A WSGI server cannot efficiently run async applications because the interface itself is synchronous.

Some solutions emerged:

  • Run each WSGI request in a thread pool and bridge to asyncio
  • Use gevent to make blocking calls look async
  • Accept that you can’t use async/await in WSGI handlers

None of these are satisfying. The synchronous interface was a genuine constraint.

The HTTP/2 Problem

HTTP/2 multiplexes multiple requests over a single TCP connection. The server can push resources to the client before they’re requested. Requests can be prioritized. All of this happens over a long-lived connection with multiple concurrent streams.

WSGI models each request as a separate function call with its own environ. It has no concept of a “connection” that persists across requests, no way to push data to the client, no way to handle multiple concurrent streams over a single connection.

HTTP/2 server push never really went anywhere (HTTP/3 dropped it), but the point stands: WSGI’s model of “one function call = one request” is a fundamental constraint that HTTP/2’s connection-level features can’t fit into.

What ASGI Changes

ASGI (Asynchronous Server Gateway Interface) solves these problems with a different model:

Instead of:

# WSGI: one call, one response
response = app(environ, start_response)

ASGI uses:

# ASGI: async, event-based, connection-aware
await app(scope, receive, send)

The differences:

  • async: the application is an async callable, so Python’s event loop can interleave multiple concurrent requests on a single thread
  • scope: connection metadata (similar to environ) but includes the type of connection — HTTP, WebSocket, or lifespan
  • receive: an async callable that your app calls to receive events (incoming messages, request body chunks, WebSocket messages)
  • send: an async callable that your app calls to send events (response start, body chunks, WebSocket messages)

The event-based model means:

  • WebSockets work natively: receive a message event, send a message event, repeat
  • Server-sent events work without blocking: send body chunk events as data arrives
  • HTTP/2 streams could work: each stream is a separate scope
  • Lifespan events work: startup and shutdown events around the app’s lifecycle

ASGI is more complex than WSGI. That’s not an accident — it’s handling more complex scenarios. But it’s still just a callable. A more sophisticated callable, with a more sophisticated interface, but a callable.

The Cost

Nothing is free. ASGI’s complexity has real costs:

Debugging is harder. Async tracebacks are longer and less obvious. An exception in an async generator might surface somewhere unexpected.

The mental model is different. WSGI is easy to reason about: function in, function out. ASGI requires understanding coroutines, event loops, and the receive/send event model.

Not everything is async. Most database drivers, file I/O, and third-party libraries were written for synchronous use. In ASGI, calling a blocking function in a coroutine blocks the entire event loop. You need asyncio.run_in_executor or async libraries.

You don’t need it for standard HTTP. If your application is 100% request/response with no WebSockets, no SSE, and no HTTP/2 push, WSGI is fine. Gunicorn on a few WSGI workers will serve you well.

But if you do need WebSockets, or you want to write genuinely async handlers that don’t block on I/O, or you’re building something that needs to hold many connections open simultaneously — ASGI is the right tool.

Let’s look at the spec.

The ASGI Spec (scope, receive, send — that’s literally it)

The ASGI specification lives at asgi.readthedocs.io. Like WSGI, the actual interface is simpler than the documentation makes it sound. Unlike WSGI, there are three connection types to understand: HTTP, WebSocket, and Lifespan.

Let’s read the spec.

The Interface

An ASGI application is an async callable with this signature:

async def application(scope: dict, receive: callable, send: callable) -> None:
    ...

Three arguments:

  • scope: a dict describing the connection (like environ, but for the connection type)
  • receive: an async callable — call it to receive the next event from the client
  • send: an async callable — call it to send an event to the client

No return value. All communication happens through receive and send.

The scope Dictionary

scope contains connection metadata. The most important key is type, which tells you what kind of connection you’re handling.

HTTP scope

scope = {
    "type": "http",
    "asgi": {"version": "3.0"},
    "http_version": "1.1",        # or "2"
    "method": "GET",              # uppercase
    "path": "/users/42",
    "raw_path": b"/users/42",
    "query_string": b"active=true",
    "root_path": "",
    "scheme": "http",             # or "https"
    "headers": [                  # list of (name, value) byte-string tuples
        (b"host", b"example.com"),
        (b"accept", b"application/json"),
        (b"content-type", b"application/json"),
        (b"content-length", b"42"),
    ],
    "server": ("127.0.0.1", 8000),  # (host, port) tuple
    "client": ("127.0.0.1", 54321), # client address
}

Notice: headers are bytes, not strings. ASGI works closer to the wire than WSGI — the header names and values are byte strings, and frameworks convert them to strings when they wrap the scope.

WebSocket scope

scope = {
    "type": "websocket",
    "asgi": {"version": "3.0"},
    "path": "/ws/chat",
    "query_string": b"room=general",
    "headers": [...],  # same format as HTTP
    "server": ("127.0.0.1", 8000),
    "client": ("127.0.0.1", 54322),
    "subprotocols": [],  # requested WebSocket subprotocols
}

Lifespan scope

scope = {
    "type": "lifespan",
    "asgi": {"version": "3.0"},
}

We’ll cover lifespan in detail in its own chapter.

The Events

receive and send deal in events — dicts with a type key.

HTTP events

Received from client:

# http.request — the body of the HTTP request
{
    "type": "http.request",
    "body": b"...",          # bytes (possibly empty)
    "more_body": False,      # True if more chunks are coming
}

# http.disconnect — client disconnected
{
    "type": "http.disconnect",
}

Sent to client:

# http.response.start — sends status and headers
# Must be sent before http.response.body
{
    "type": "http.response.start",
    "status": 200,           # integer, not string
    "headers": [
        (b"content-type", b"text/plain"),
        (b"content-length", b"13"),
    ],
}

# http.response.body — sends body data
{
    "type": "http.response.body",
    "body": b"Hello, world!",
    "more_body": False,      # True = more chunks coming; False = done
}

WebSocket events

Received:

# websocket.connect — client initiated WebSocket handshake
{"type": "websocket.connect"}

# websocket.receive — client sent a message
{
    "type": "websocket.receive",
    "bytes": None,           # bytes message (or None)
    "text": "hello",         # text message (or None)
}

# websocket.disconnect — client disconnected
{
    "type": "websocket.disconnect",
    "code": 1000,            # WebSocket close code
}

Sent:

# websocket.accept — accept the WebSocket handshake
{
    "type": "websocket.accept",
    "subprotocol": None,     # optional agreed subprotocol
    "headers": [],           # extra headers in the handshake response
}

# websocket.send — send a message to the client
{
    "type": "websocket.send",
    "bytes": None,           # bytes message (or None)
    "text": "hello",         # text message (or None)
}

# websocket.close — close the connection
{
    "type": "websocket.close",
    "code": 1000,            # WebSocket close code
}

The Simplest Possible ASGI App

async def application(scope, receive, send):
    """A complete, working ASGI application."""
    if scope["type"] != "http":
        return  # Ignore non-HTTP connections for now

    # Read the request (we don't use it, but we should consume it)
    event = await receive()
    assert event["type"] == "http.request"

    # Send the response
    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [
            (b"content-type", b"text/plain; charset=utf-8"),
            (b"content-length", b"13"),
        ],
    })
    await send({
        "type": "http.response.body",
        "body": b"Hello, world!",
        "more_body": False,
    })

Run it with Uvicorn:

pip install uvicorn
uvicorn app:application

That’s a fully functional ASGI application. No framework, no dependencies beyond uvicorn.

Reading the Full Request Body

HTTP bodies can arrive in chunks. The more_body flag tells you if more is coming:

async def read_body(receive: callable) -> bytes:
    """Read the complete request body, handling chunks."""
    body = b""
    while True:
        event = await receive()
        if event["type"] == "http.request":
            body += event.get("body", b"")
            if not event.get("more_body", False):
                break
        elif event["type"] == "http.disconnect":
            break  # Client disconnected before sending full body
    return body

For small bodies, the server typically sends everything in one http.request event with more_body=False. For large bodies or streaming uploads, you’ll see multiple events.

A More Complete Example

import json


async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        await handle_lifespan(scope, receive, send)
    elif scope["type"] == "http":
        await handle_http(scope, receive, send)


async def handle_lifespan(scope, receive, send):
    while True:
        event = await receive()
        if event["type"] == "lifespan.startup":
            await send({"type": "lifespan.startup.complete"})
        elif event["type"] == "lifespan.shutdown":
            await send({"type": "lifespan.shutdown.complete"})
            break


async def handle_http(scope, receive, send):
    method = scope["method"]
    path = scope["path"]

    # Read the full body
    body = await read_body(receive)

    # Route
    if path == "/" and method == "GET":
        response_body = b"Hello from ASGI!"
        status = 200
    elif path == "/echo" and method == "POST":
        # Echo the request body back
        data = json.loads(body) if body else {}
        response_body = json.dumps({"echo": data}).encode()
        status = 200
    else:
        response_body = b"Not Found"
        status = 404

    await send({
        "type": "http.response.start",
        "status": status,
        "headers": [
            (b"content-type", b"application/json"),
            (b"content-length", str(len(response_body)).encode()),
        ],
    })
    await send({
        "type": "http.response.body",
        "body": response_body,
        "more_body": False,
    })


async def read_body(receive) -> bytes:
    body = b""
    while True:
        event = await receive()
        if event["type"] == "http.request":
            body += event.get("body", b"")
            if not event.get("more_body", False):
                break
        elif event["type"] == "http.disconnect":
            break
    return body

Comparing WSGI and ASGI Side by Side

# WSGI
def wsgi_app(environ, start_response):
    method = environ['REQUEST_METHOD']
    path = environ['PATH_INFO']
    body = environ['wsgi.input'].read(int(environ.get('CONTENT_LENGTH') or 0))

    start_response("200 OK", [("Content-Type", "text/plain")])
    return [b"Hello"]


# ASGI equivalent
async def asgi_app(scope, receive, send):
    method = scope['method']
    path = scope['path']
    body = await read_body(receive)

    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [(b"content-type", b"text/plain")],
    })
    await send({
        "type": "http.response.body",
        "body": b"Hello",
        "more_body": False,
    })

The ASGI version is more verbose for simple HTTP. That’s the cost of the generality — the same interface handles HTTP, WebSockets, and lifespan, so HTTP alone requires a bit more ceremony.

Headers: Bytes or Strings?

ASGI headers are byte-string tuples: (b"content-type", b"text/plain"). This is closer to the wire format — HTTP headers are bytes.

WSGI headers are string tuples: ("Content-Type", "text/plain"). WSGI is at a higher abstraction level, hiding the byte/string conversion.

Frameworks handle the bytes-to-strings conversion for you. When you access request.headers["content-type"] in Starlette or FastAPI, the framework has already decoded the byte strings from scope["headers"]. But at the raw ASGI level, it’s bytes.

The asgi Key

Every scope dict includes an "asgi" key with version information:

scope["asgi"] == {"version": "3.0"}

ASGI 3.0 is the current version (the one we’re describing). Earlier versions had a different interface; you’ll rarely encounter them today. If you’re writing ASGI apps in 2024+, you’re on 3.0.

What Uvicorn Does

Uvicorn is to ASGI what Gunicorn is to WSGI — it handles the transport and calls your application.

When Uvicorn receives an HTTP request:

  1. Reads bytes from the socket
  2. Parses the HTTP request
  3. Builds the scope dict
  4. Creates receive and send callables backed by the socket
  5. Calls await app(scope, receive, send)

When your app calls await receive(), Uvicorn either returns buffered data or reads more from the socket (awaiting the I/O without blocking other connections).

When your app calls await send(event), Uvicorn serializes the event to HTTP bytes and writes them to the socket.

The event loop is asyncio. Multiple connections share a single thread via cooperative multitasking — each await point is an opportunity for the event loop to switch to another connection.

This is the key difference from WSGI: one thread can handle many concurrent connections, because await yields control voluntarily rather than blocking.

Your First ASGI App

We built a tasks API in the WSGI section. Let’s rebuild it as ASGI — same functionality, but async, with better structure. By the end you’ll have a working JSON API that demonstrates the full ASGI request/response cycle.

The Foundation

First, let’s build the utilities we’ll need. In ASGI, headers are bytes and parsing happens more explicitly:

import json
import uuid
from typing import Any, Dict, List, Optional, Tuple


# Type aliases for clarity
Headers = List[Tuple[bytes, bytes]]


def get_header(scope_headers: Headers, name: str) -> Optional[str]:
    """Get a header value from ASGI scope headers (case-insensitive)."""
    name_bytes = name.lower().encode("latin-1")
    for key, value in scope_headers:
        if key.lower() == name_bytes:
            return value.decode("latin-1")
    return None


def make_headers(*pairs: Tuple[str, str]) -> Headers:
    """Build ASGI headers from string tuples."""
    return [
        (name.lower().encode("latin-1"), value.encode("latin-1"))
        for name, value in pairs
    ]


async def read_body(receive) -> bytes:
    """Read the full request body, handling chunked delivery."""
    body = b""
    while True:
        event = await receive()
        if event["type"] == "http.request":
            body += event.get("body", b"")
            if not event.get("more_body", False):
                break
        elif event["type"] == "http.disconnect":
            break
    return body


async def send_response(send, status: int, body: bytes, headers: Headers) -> None:
    """Send a complete HTTP response."""
    # Always include Content-Length
    all_headers = list(headers) + [
        (b"content-length", str(len(body)).encode())
    ]
    await send({
        "type": "http.response.start",
        "status": status,
        "headers": all_headers,
    })
    await send({
        "type": "http.response.body",
        "body": body,
        "more_body": False,
    })


async def send_json(send, data: Any, status: int = 200) -> None:
    """Send a JSON response."""
    body = json.dumps(data, indent=2, default=str).encode("utf-8")
    await send_response(
        send,
        status,
        body,
        make_headers("content-type", "application/json"),
    )

The Application

# In-memory store
tasks: Dict[str, Dict] = {}


async def application(scope, receive, send):
    """Main ASGI application."""
    if scope["type"] == "lifespan":
        await handle_lifespan(receive, send)
        return

    if scope["type"] != "http":
        return

    await handle_http(scope, receive, send)


async def handle_lifespan(receive, send):
    while True:
        event = await receive()
        if event["type"] == "lifespan.startup":
            # Initialize resources here
            print("Application starting up")
            await send({"type": "lifespan.startup.complete"})
        elif event["type"] == "lifespan.shutdown":
            # Clean up resources here
            print("Application shutting down")
            await send({"type": "lifespan.shutdown.complete"})
            break


async def handle_http(scope, receive, send):
    method = scope["method"]
    path = scope["path"]

    # Route dispatch
    if path == "/tasks":
        if method == "GET":
            await list_tasks(scope, receive, send)
        elif method == "POST":
            await create_task(scope, receive, send)
        else:
            await send_json(send, {"error": "method not allowed"}, 405)

    elif path.startswith("/tasks/"):
        task_id = path[len("/tasks/"):]
        if not task_id:
            await send_json(send, {"error": "not found"}, 404)
            return

        if method == "GET":
            await get_task(task_id, scope, receive, send)
        elif method == "DELETE":
            await delete_task(task_id, scope, receive, send)
        elif method == "PATCH":
            await update_task(task_id, scope, receive, send)
        else:
            await send_json(send, {"error": "method not allowed"}, 405)

    else:
        await send_json(send, {"error": "not found"}, 404)


async def list_tasks(scope, receive, send):
    # Consume the body even if we don't use it (good practice)
    await read_body(receive)
    await send_json(send, list(tasks.values()))


async def create_task(scope, receive, send):
    content_type = get_header(scope["headers"], "content-type") or ""
    if "application/json" not in content_type:
        await send_json(send, {"error": "Content-Type must be application/json"}, 415)
        return

    body = await read_body(receive)
    try:
        data = json.loads(body)
    except (json.JSONDecodeError, UnicodeDecodeError):
        await send_json(send, {"error": "invalid JSON"}, 400)
        return

    if "title" not in data:
        await send_json(send, {"error": "title is required"}, 400)
        return

    task = {
        "id": str(uuid.uuid4()),
        "title": str(data["title"]),
        "done": bool(data.get("done", False)),
    }
    tasks[task["id"]] = task
    await send_json(send, task, 201)


async def get_task(task_id: str, scope, receive, send):
    await read_body(receive)
    task = tasks.get(task_id)
    if task is None:
        await send_json(send, {"error": "not found"}, 404)
        return
    await send_json(send, task)


async def delete_task(task_id: str, scope, receive, send):
    await read_body(receive)
    if task_id not in tasks:
        await send_json(send, {"error": "not found"}, 404)
        return
    deleted = tasks.pop(task_id)
    await send_json(send, deleted)


async def update_task(task_id: str, scope, receive, send):
    task = tasks.get(task_id)
    if task is None:
        await send_json(send, {"error": "not found"}, 404)
        return

    body = await read_body(receive)
    try:
        data = json.loads(body)
    except (json.JSONDecodeError, UnicodeDecodeError):
        await send_json(send, {"error": "invalid JSON"}, 400)
        return

    if "done" in data:
        task["done"] = bool(data["done"])
    if "title" in data:
        task["title"] = str(data["title"])

    await send_json(send, task)

Save as asgi_tasks.py, run with uvicorn:

pip install uvicorn
uvicorn asgi_tasks:application --reload

Test it:

# Create a task
curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "Learn ASGI"}'

# List tasks
curl http://localhost:8000/tasks

# Update a task (replace the ID with one from your previous response)
curl -X PATCH http://localhost:8000/tasks/YOUR-ID \
  -H "Content-Type: application/json" \
  -d '{"done": true}'

# Delete a task
curl -X DELETE http://localhost:8000/tasks/YOUR-ID

Streaming Responses

One thing ASGI handles well that WSGI struggles with: streaming large responses. Instead of buffering everything in memory, send body chunks as they’re produced:

import asyncio


async def streaming_app(scope, receive, send):
    """Send a large response in chunks."""
    if scope["type"] != "http":
        return

    await read_body(receive)  # Consume request body

    # Start the response — no Content-Length for streaming
    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [
            (b"content-type", b"text/plain"),
            (b"transfer-encoding", b"chunked"),
        ],
    })

    # Send data in chunks
    for i in range(10):
        await asyncio.sleep(0.1)  # Simulate work
        chunk = f"Line {i}: some data\n".encode()
        await send({
            "type": "http.response.body",
            "body": chunk,
            "more_body": True,  # More is coming
        })

    # Final empty body closes the stream
    await send({
        "type": "http.response.body",
        "body": b"",
        "more_body": False,
    })
curl -N http://localhost:8000/  # -N = no buffering, shows chunks as they arrive

You’ll see lines appear one at a time, 100ms apart. In WSGI, you’d have to return a generator and cross your fingers that Gunicorn didn’t buffer it. In ASGI, the streaming model is first-class.

Detecting Client Disconnects

ASGI lets you detect when a client disconnects mid-request. Useful for canceling expensive work:

import asyncio


async def long_running_app(scope, receive, send):
    if scope["type"] != "http":
        return

    await read_body(receive)

    # Run work and listen for disconnect concurrently
    disconnect_task = asyncio.ensure_future(wait_for_disconnect(receive))
    work_task = asyncio.ensure_future(do_expensive_work())

    done, pending = await asyncio.wait(
        [disconnect_task, work_task],
        return_when=asyncio.FIRST_COMPLETED,
    )

    for task in pending:
        task.cancel()

    if disconnect_task in done:
        # Client disconnected — don't bother sending a response
        return

    result = work_task.result()
    await send_json(send, result)


async def wait_for_disconnect(receive) -> None:
    """Await the http.disconnect event."""
    while True:
        event = await receive()
        if event["type"] == "http.disconnect":
            return


async def do_expensive_work() -> dict:
    await asyncio.sleep(5)  # Simulate 5 seconds of work
    return {"result": "done"}

In WSGI, you can’t do this — you have no way to detect a client disconnect mid-processing. Either you do the work and try to send, or you set a timeout and hope. ASGI gives you a proper event for it.

Query Parameters

ASGI passes query_string as bytes. Parse it:

import urllib.parse


def parse_query(scope) -> Dict[str, List[str]]:
    """Parse query string from ASGI scope."""
    qs = scope.get("query_string", b"").decode("latin-1")
    return urllib.parse.parse_qs(qs, keep_blank_values=True)


# In a handler:
async def search_tasks(scope, receive, send):
    await read_body(receive)
    params = parse_query(scope)
    query = params.get("q", [""])[0]
    done_filter = params.get("done", [None])[0]

    results = list(tasks.values())
    if query:
        results = [t for t in results if query.lower() in t["title"].lower()]
    if done_filter is not None:
        done = done_filter.lower() == "true"
        results = [t for t in results if t["done"] == done]

    await send_json(send, results)

The __main__ Runner

For development, uvicorn can be run programmatically:

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "asgi_tasks:application",
        host="127.0.0.1",
        port=8000,
        reload=True,  # Auto-reload on file changes
        log_level="info",
    )

Or as a module:

python -m uvicorn asgi_tasks:application --reload

What This Reveals

Looking at the application we just built, notice what’s verbose:

  • The routing is still manual if/elif
  • Reading the body requires the read_body helper
  • Header access requires get_header
  • Every handler needs to await read_body even if it doesn’t use the body

This is exactly what Starlette (and by extension FastAPI) solves. Starlette’s Request object wraps the ASGI scope with request.method, request.url, request.headers, await request.body(), await request.json(). Starlette’s routing matches paths and dispatches to async view functions. Starlette’s Response classes build the correct http.response.start and http.response.body events.

FastAPI adds type-annotated dependency injection on top of Starlette.

But now you know what they’re building on. Every Starlette route handler ultimately calls await send({"type": "http.response.start", ...}). Every FastAPI request processes an ASGI scope dict. The wrappers are real, the underlying reality is what we’ve been working with.

Build an ASGI Server from Scratch

Building a WSGI server required sockets and HTTP parsing. Building an ASGI server requires all of that plus asyncio. The concepts are the same; the mechanics shift from blocking I/O to coroutines.

This chapter builds a working async HTTP server that calls ASGI applications correctly. Not production-ready — we’ll leave chunked encoding and HTTP/2 for Uvicorn — but correct enough to understand what Uvicorn is actually doing.

The Architecture

An asyncio-based server has a different shape than a threaded one:

asyncio event loop
    ↓
asyncio.start_server()
    ↓
handle_connection() — called as a coroutine for each connection
    ├── reads from asyncio.StreamReader
    ├── parses HTTP request
    ├── builds scope dict
    ├── creates receive/send coroutines
    ├── awaits app(scope, receive, send)
    └── writes to asyncio.StreamWriter

Instead of spawning a thread per connection, we create a coroutine per connection. The event loop switches between coroutines at await points, so hundreds of concurrent connections can be handled by a single thread.

The HTTP Parser

Same logic as the WSGI server, adapted for async reading:

import asyncio
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple


@dataclass
class ParsedRequest:
    method: str
    path: str
    query_string: bytes
    http_version: str
    headers: List[Tuple[bytes, bytes]]
    body: bytes


async def read_http_request(reader: asyncio.StreamReader) -> Optional[bytes]:
    """
    Read a complete HTTP request from an async stream.
    Returns raw bytes or None if the connection closed.
    """
    data = b""

    # Read until we have the complete headers
    try:
        while b"\r\n\r\n" not in data:
            chunk = await asyncio.wait_for(reader.read(4096), timeout=5.0)
            if not chunk:
                return None
            data += chunk
    except asyncio.TimeoutError:
        return None

    # Parse Content-Length to read the body
    header_end = data.find(b"\r\n\r\n") + 4
    content_length = 0

    for line in data[:header_end].decode("latin-1").split("\r\n"):
        if line.lower().startswith("content-length:"):
            try:
                content_length = int(line.split(":", 1)[1].strip())
            except ValueError:
                pass
            break

    # Read body if needed
    body_received = len(data) - header_end
    while body_received < content_length:
        try:
            chunk = await asyncio.wait_for(reader.read(4096), timeout=5.0)
        except asyncio.TimeoutError:
            break
        if not chunk:
            break
        data += chunk
        body_received += len(chunk)

    return data


def parse_request(raw: bytes) -> Optional[ParsedRequest]:
    """Parse raw HTTP bytes into a ParsedRequest."""
    header_end = raw.find(b"\r\n\r\n")
    if header_end == -1:
        return None

    header_section = raw[:header_end].decode("latin-1")
    body = raw[header_end + 4:]

    lines = header_section.split("\r\n")
    try:
        method, raw_path, http_version = lines[0].split(" ", 2)
    except ValueError:
        return None

    # Split path and query string
    if b"?" in raw_path.encode():
        path_bytes, query_string = raw_path.encode().split(b"?", 1)
    else:
        path_bytes = raw_path.encode()
        query_string = b""

    # Parse headers as byte tuples (ASGI format)
    headers: List[Tuple[bytes, bytes]] = []
    for line in lines[1:]:
        if ": " in line:
            name, _, value = line.partition(": ")
            headers.append((name.lower().encode("latin-1"),
                            value.encode("latin-1")))

    # Trim body
    content_length = 0
    for name, value in headers:
        if name == b"content-length":
            try:
                content_length = int(value)
            except ValueError:
                pass
            break

    return ParsedRequest(
        method=method.upper(),
        path=path_bytes.decode("latin-1"),
        query_string=query_string,
        http_version=http_version,
        headers=headers,
        body=body[:content_length],
    )

The ASGI Bridge

The key piece: create receive and send coroutines that bridge between the ASGI protocol and the TCP connection.

async def make_receive_send(
    request: ParsedRequest,
    writer: asyncio.StreamWriter,
) -> tuple:
    """
    Create the receive and send callables for an ASGI HTTP connection.
    Returns (receive, send, get_response_started).
    """
    # Track whether we've sent the body yet
    body_sent = False
    request_consumed = False
    response_started = False
    disconnect_event = asyncio.Event()

    async def receive():
        nonlocal request_consumed
        if not request_consumed:
            request_consumed = True
            return {
                "type": "http.request",
                "body": request.body,
                "more_body": False,
            }
        # Wait for disconnect (in a real server, we'd detect this from the socket)
        await disconnect_event.wait()
        return {"type": "http.disconnect"}

    response_headers = []
    response_status = None

    async def send(event):
        nonlocal response_started, body_sent, response_status

        if event["type"] == "http.response.start":
            response_status = event["status"]
            response_headers.extend(event.get("headers", []))
            response_started = True

        elif event["type"] == "http.response.body":
            if not response_started:
                raise RuntimeError("Must send http.response.start before body")

            body = event.get("body", b"")
            more_body = event.get("more_body", False)

            if not body_sent:
                # Write the HTTP response headers first
                status_line = f"HTTP/1.1 {response_status} {get_reason(response_status)}\r\n"
                writer.write(status_line.encode("latin-1"))

                for name, value in response_headers:
                    if isinstance(name, bytes):
                        name = name.decode("latin-1")
                    if isinstance(value, bytes):
                        value = value.decode("latin-1")
                    writer.write(f"{name}: {value}\r\n".encode("latin-1"))
                writer.write(b"\r\n")
                body_sent = True

            writer.write(body)

            if not more_body:
                await writer.drain()
                disconnect_event.set()

    return receive, send


def get_reason(status_code: int) -> str:
    reasons = {
        200: "OK", 201: "Created", 204: "No Content",
        301: "Moved Permanently", 302: "Found", 304: "Not Modified",
        400: "Bad Request", 401: "Unauthorized", 403: "Forbidden",
        404: "Not Found", 405: "Method Not Allowed",
        422: "Unprocessable Entity",
        500: "Internal Server Error",
    }
    return reasons.get(status_code, "Unknown")

The Server Loop

import sys
from typing import Callable


async def handle_connection(
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter,
    app: Callable,
    server_host: str,
    server_port: int,
) -> None:
    """Handle one HTTP connection."""
    try:
        # Read the request
        raw = await read_http_request(reader)
        if not raw:
            return

        # Parse it
        request = parse_request(raw)
        if request is None:
            writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\n")
            await writer.drain()
            return

        # Build the ASGI scope
        scope = {
            "type": "http",
            "asgi": {"version": "3.0"},
            "http_version": request.http_version.replace("HTTP/", ""),
            "method": request.method,
            "path": request.path,
            "raw_path": request.path.encode("latin-1"),
            "query_string": request.query_string,
            "root_path": "",
            "scheme": "http",
            "headers": request.headers,
            "server": (server_host, server_port),
        }

        # Get client address
        peername = writer.get_extra_info("peername")
        if peername:
            scope["client"] = peername

        # Create receive/send
        receive, send = await make_receive_send(request, writer)

        # Call the ASGI app
        await app(scope, receive, send)

    except Exception as e:
        print(f"Error handling connection: {e}", file=sys.stderr)
        try:
            writer.write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
            await writer.drain()
        except Exception:
            pass
    finally:
        try:
            writer.close()
            await writer.wait_closed()
        except Exception:
            pass


async def serve(
    app: Callable,
    host: str = "127.0.0.1",
    port: int = 8000,
) -> None:
    """Start the ASGI server."""
    # Send the lifespan startup event
    await send_lifespan_startup(app)

    server = await asyncio.start_server(
        lambda r, w: handle_connection(r, w, app, host, port),
        host,
        port,
    )

    async with server:
        addr = server.sockets[0].getsockname()
        print(f"Serving on http://{addr[0]}:{addr[1]}", file=sys.stderr)
        try:
            await server.serve_forever()
        except (KeyboardInterrupt, asyncio.CancelledError):
            pass

    await send_lifespan_shutdown(app)


async def send_lifespan_startup(app: Callable) -> None:
    """Send the lifespan.startup event if the app handles it."""
    scope = {"type": "lifespan", "asgi": {"version": "3.0"}}
    startup_complete = asyncio.Event()
    events = asyncio.Queue()

    await events.put({"type": "lifespan.startup"})

    async def receive():
        return await events.get()

    async def send(event):
        if event["type"] == "lifespan.startup.complete":
            startup_complete.set()
        elif event["type"] == "lifespan.startup.failed":
            raise RuntimeError(f"Startup failed: {event.get('message', '')}")

    try:
        task = asyncio.create_task(app(scope, receive, send))
        await asyncio.wait_for(startup_complete.wait(), timeout=10.0)
        # Don't await the task — it runs for the app's lifetime
    except asyncio.TimeoutError:
        print("Warning: lifespan startup timed out", file=sys.stderr)
    except Exception as e:
        print(f"Warning: lifespan startup failed: {e}", file=sys.stderr)


async def send_lifespan_shutdown(app: Callable) -> None:
    """Send the lifespan.shutdown event."""
    # In a full implementation, we'd track the lifespan task
    # and send shutdown. Simplified here.
    pass

Running the Server

# asgi_server.py

# [paste all the code above, then:]

if __name__ == "__main__":
    from asgi_tasks import application  # The app from the previous chapter
    asyncio.run(serve(application))
python asgi_server.py &
curl -X POST http://127.0.0.1:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "Works on my ASGI server"}'

Testing Concurrency

The real test: can our server handle multiple requests concurrently? Since we’re using asyncio, the answer should be yes — as long as the handlers don’t block.

# concurrent_test.py
import asyncio
import time


async def make_request(session_num: int) -> float:
    """Make an HTTP request and return how long it took."""
    start = time.monotonic()
    reader, writer = await asyncio.open_connection("127.0.0.1", 8000)

    request = (
        f"GET /tasks HTTP/1.1\r\n"
        f"Host: localhost\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    )
    writer.write(request.encode())
    await writer.drain()

    response = b""
    while chunk := await reader.read(4096):
        response += chunk

    writer.close()
    elapsed = time.monotonic() - start
    print(f"Request {session_num}: {elapsed*1000:.1f}ms")
    return elapsed


async def main():
    # Make 10 concurrent requests
    tasks = [make_request(i) for i in range(10)]
    times = await asyncio.gather(*tasks)
    print(f"Total elapsed: {max(times)*1000:.1f}ms")
    print(f"Average: {sum(times)/len(times)*1000:.1f}ms")


asyncio.run(main())

With a synchronous server (our WSGI server from earlier), 10 sequential requests take 10x as long as one. With our async server, 10 concurrent requests should take roughly as long as one — because the event loop handles them all simultaneously.

What Uvicorn Does Differently

Our server is correct but incomplete. Uvicorn adds:

HTTP/1.1 keep-alive: reuse the connection for multiple requests. We close after each one. This matters for performance — TLS handshakes and TCP connection setup are expensive.

HTTP/2: multiplexed streams over a single connection. This is substantially more complex — h2 (the Python HTTP/2 library) handles the framing; Uvicorn maps the streams to ASGI scopes.

Proper streaming: chunked transfer encoding for responses without Content-Length, streaming request body parsing.

SSL/TLS: pass --ssl-keyfile and --ssl-certfile to Uvicorn and it handles TLS. Our server is plain HTTP.

Worker processes: uvicorn --workers 4 forks four worker processes, each running the asyncio event loop. More workers = more CPU cores utilized.

Graceful shutdown: when you send SIGTERM, Uvicorn stops accepting new connections, finishes in-flight requests, then exits. Our KeyboardInterrupt handling is abrupt.

But the core logic — accept connection, read request, build scope, call await app(scope, receive, send), write response — is exactly what we’ve implemented.

The asyncio Mental Model

One thing worth making explicit: asyncio is cooperative multitasking. Tasks run until they explicitly yield control with await. When a task awaits network I/O (reader.read(), writer.drain()), the event loop can run other tasks.

This means:

  • CPU-bound work blocks everything: if a handler does heavy computation without awaiting, no other connection can run. Use asyncio.run_in_executor for CPU-bound work.
  • Blocking I/O blocks everything: time.sleep(1) in a handler blocks the entire server for 1 second. Use await asyncio.sleep(1) instead.
  • One thread, many connections: unlike threaded servers, there’s no race condition between connections (within one process). The event loop is single-threaded.

This model is why async Python is fast for I/O-bound workloads (web servers, API proxies) but not for CPU-bound ones (image processing, ML inference). Know which one you have.

Lifespan Events (Startup, Shutdown, and Existential Dread)

Every long-running application has things it needs to do before it starts serving requests, and things it needs to do before it stops. Connect to a database. Load a model into memory. Start a background task. Flush a cache. Close connection pools gracefully rather than dropping them mid-operation.

WSGI has no solution for this. You initialize things at module import time (which works) or you use Gunicorn’s worker hooks (which are server-specific). Neither is portable.

ASGI has first-class support for it: the lifespan protocol.

The Lifespan Scope

When an ASGI server starts your application, before processing any HTTP or WebSocket connections, it sends a lifespan scope:

scope = {
    "type": "lifespan",
    "asgi": {"version": "3.0"},
}

Your application is called with this scope and a receive/send pair. The lifespan coroutine then runs for the entire application lifetime:

Server starts
    → calls app(lifespan_scope, receive, send)
    → app receives "lifespan.startup"
    → app does startup work
    → app sends "lifespan.startup.complete"
    → server starts accepting HTTP/WebSocket connections
    ...
    [Server receives SIGTERM]
    → server stops accepting new connections
    → finishes in-flight requests
    → sends "lifespan.shutdown" to the lifespan coroutine
    → app does cleanup
    → app sends "lifespan.shutdown.complete"
    → server exits

Basic Lifespan Handler

async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        await handle_lifespan(receive, send)
    elif scope["type"] == "http":
        await handle_http(scope, receive, send)


async def handle_lifespan(receive, send):
    while True:
        event = await receive()

        if event["type"] == "lifespan.startup":
            try:
                await startup()
                await send({"type": "lifespan.startup.complete"})
            except Exception as e:
                await send({
                    "type": "lifespan.startup.failed",
                    "message": str(e),
                })
                return

        elif event["type"] == "lifespan.shutdown":
            try:
                await shutdown()
                await send({"type": "lifespan.shutdown.complete"})
            except Exception as e:
                await send({
                    "type": "lifespan.shutdown.failed",
                    "message": str(e),
                })
            return


async def startup():
    print("Application starting: connecting to database, loading caches...")
    # await db.connect()
    # await cache.warmup()


async def shutdown():
    print("Application shutting down: closing connections...")
    # await db.disconnect()
    # await cache.flush()

Sharing State Between Lifespan and Handlers

Here’s the practical problem: you initialize a database connection pool in startup(), but your HTTP handlers need to use it. How do you get it to them?

The common pattern: use a module-level state container.

# state.py
from dataclasses import dataclass, field
from typing import Any, Optional


@dataclass
class AppState:
    db: Optional[Any] = None
    cache: Optional[Any] = None
    config: dict = field(default_factory=dict)


state = AppState()
# app.py
import asyncio
import json
from state import state


async def startup():
    # In a real app, these would be actual async connections
    state.db = await create_db_pool()
    state.cache = await create_redis_client()
    state.config = await load_config()
    print(f"Started. DB: {state.db}, Cache: {state.cache}")


async def shutdown():
    if state.db:
        await state.db.close()
    if state.cache:
        await state.cache.close()
    print("Clean shutdown complete.")


async def create_db_pool():
    """Simulate creating a database connection pool."""
    await asyncio.sleep(0.1)  # Simulate async connection
    return {"pool": "connected", "size": 10}


async def create_redis_client():
    await asyncio.sleep(0.05)
    return {"redis": "connected"}


async def load_config():
    return {"env": "production", "debug": False}


async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        await handle_lifespan(receive, send)
    elif scope["type"] == "http":
        await handle_http(scope, receive, send)


async def handle_lifespan(receive, send):
    while True:
        event = await receive()
        if event["type"] == "lifespan.startup":
            await startup()
            await send({"type": "lifespan.startup.complete"})
        elif event["type"] == "lifespan.shutdown":
            await shutdown()
            await send({"type": "lifespan.shutdown.complete"})
            return


async def handle_http(scope, receive, send):
    # Now we can use state.db, state.cache, etc.
    body = json.dumps({
        "db": str(state.db),
        "cache": str(state.cache),
        "config": state.config,
    }).encode()

    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [
            (b"content-type", b"application/json"),
            (b"content-length", str(len(body)).encode()),
        ],
    })
    await send({
        "type": "http.response.body",
        "body": body,
        "more_body": False,
    })

Scope-Based State (The Starlette Pattern)

A cleaner pattern: store state in the ASGI scope itself. Starlette does this:

async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        await handle_lifespan(scope, receive, send)
    elif scope["type"] in ("http", "websocket"):
        # State from lifespan is available in scope["state"]
        await handle_http(scope, receive, send)


async def handle_lifespan(scope, receive, send):
    # Initialize a state dict in the scope
    scope["state"] = {}

    while True:
        event = await receive()
        if event["type"] == "lifespan.startup":
            scope["state"]["db"] = await create_db_pool()
            scope["state"]["started_at"] = time.time()
            await send({"type": "lifespan.startup.complete"})
        elif event["type"] == "lifespan.shutdown":
            await scope["state"]["db"].close()
            await send({"type": "lifespan.shutdown.complete"})
            return


async def handle_http(scope, receive, send):
    # Access state from scope
    db = scope.get("state", {}).get("db")
    # ...

Note: this works because Uvicorn passes the same scope object reference to both the lifespan call and each subsequent HTTP/WebSocket call. The state dict you attach in lifespan is available everywhere.

Background Tasks

Lifespan is also where you start and stop background tasks — things that run continuously alongside request handling:

import asyncio


background_tasks = set()


async def startup():
    # Start a periodic cleanup task
    task = asyncio.create_task(periodic_cleanup())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

    # Start a health check task
    task = asyncio.create_task(report_health())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)


async def shutdown():
    # Cancel all background tasks
    for task in list(background_tasks):
        task.cancel()

    # Wait for them to finish
    if background_tasks:
        await asyncio.gather(*background_tasks, return_exceptions=True)


async def periodic_cleanup():
    while True:
        try:
            await asyncio.sleep(300)  # Every 5 minutes
            await cleanup_expired_sessions()
        except asyncio.CancelledError:
            break  # Graceful shutdown
        except Exception as e:
            print(f"Cleanup error: {e}")


async def report_health():
    while True:
        try:
            await asyncio.sleep(60)  # Every minute
            # Report to monitoring service
        except asyncio.CancelledError:
            break
        except Exception as e:
            print(f"Health report error: {e}")


async def cleanup_expired_sessions():
    pass  # Actual implementation would hit the database

The asyncio.CancelledError handling in the background tasks is important: when you call task.cancel() during shutdown, the task receives a CancelledError. If you don’t catch it, the task exits with an exception. If you do catch it and don’t re-raise, the task exits cleanly.

What Happens If Lifespan Fails

If your startup() raises an exception and you send lifespan.startup.failed:

await send({
    "type": "lifespan.startup.failed",
    "message": "Database connection refused",
})

Uvicorn will refuse to accept any connections and exit with an error. This is the right behavior — you don’t want to serve requests with a broken database connection.

Lifespan in Frameworks

Starlette (and FastAPI) have a lifespan parameter:

from contextlib import asynccontextmanager
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    db = await create_db_pool()
    app.state.db = db
    yield
    # Shutdown
    await db.close()


app = FastAPI(lifespan=lifespan)

This is the modern FastAPI pattern. The yield separates startup (before yield) from shutdown (after yield). Under the hood, Starlette wraps this context manager in the lifespan protocol we’ve been discussing.

The older pattern used @app.on_event("startup") and @app.on_event("shutdown") decorators. These are now deprecated in favor of the lifespan context manager approach, which is cleaner because startup and shutdown code live together.

Django has AppConfig.ready() for initialization, but it runs synchronously at import time and has no shutdown hook. For ASGI Django, you’d use Starlette’s lifespan middleware or a third-party library.

Testing Lifespan

When testing ASGI apps, you need to send the lifespan events to initialize the app properly:

import asyncio
import io


async def run_lifespan(app) -> asyncio.Event:
    """
    Trigger the app's startup sequence.
    Returns when startup is complete.
    """
    scope = {"type": "lifespan", "asgi": {"version": "3.0"}}
    events = asyncio.Queue()
    startup_complete = asyncio.Event()

    await events.put({"type": "lifespan.startup"})

    async def receive():
        return await events.get()

    async def send(event):
        if event["type"] == "lifespan.startup.complete":
            startup_complete.set()
        elif event["type"] == "lifespan.startup.failed":
            raise RuntimeError(event.get("message", "Startup failed"))

    asyncio.create_task(app(scope, receive, send))
    await startup_complete.wait()
    return events  # Return the queue so caller can send shutdown


async def test_with_lifespan():
    lifespan_queue = await run_lifespan(application)

    # Run your tests here
    # state is now initialized

    # Clean shutdown
    await lifespan_queue.put({"type": "lifespan.shutdown"})

In practice, test frameworks like httpx.AsyncClient with app= or Starlette’s TestClient handle lifespan events automatically. The patterns chapter covers testing in detail.

The Existential Dread Part

Here’s the thing nobody warns you about: in production, your app will sometimes receive SIGTERM while handling a request. The lifespan shutdown is triggered, your background tasks are cancelled, and then — what happens to the in-flight request?

Uvicorn handles this with a graceful timeout: it stops accepting new connections, waits for in-flight requests to complete (up to a configurable timeout), then triggers the lifespan shutdown.

If you’re doing something expensive in a request handler — a long database query, a slow external API call — you might hit the timeout. The request gets dropped, the client gets a connection error, and your cleanup runs anyway.

There’s no perfect solution here. The best you can do is:

  1. Set a reasonable graceful shutdown timeout (Uvicorn’s --timeout-graceful-shutdown)
  2. Make your handlers fast
  3. Use database connection pools with timeouts
  4. Accept that the occasional in-flight request will be dropped during deploys

Kubernetes rolling deployments, blue-green deploys, and load balancers with connection draining all help, but ultimately distributed systems are adversarial. Lifespan gives you a clean interface to handle it as well as possible.

WebSockets Over ASGI (Finally, a Reason to Care)

We’ve spent several chapters saying “WSGI can’t do WebSockets.” Now let’s actually use them.

WebSockets are a persistent, bidirectional communication channel between client and server. The browser opens a connection that stays open. Either side can send messages at any time. This enables chat applications, real-time dashboards, live collaboration, games — any scenario where you need low-latency communication without the overhead of polling.

ASGI handles WebSockets natively. Let’s see how.

The WebSocket Handshake

A WebSocket connection starts as an HTTP request with special headers:

GET /ws HTTP/1.1
Host: localhost:8000
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

The server responds with:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

After this exchange, the connection is no longer HTTP — it’s a WebSocket stream. The Sec-WebSocket-Accept value is a specific hash of the client’s key; browsers verify it to prevent cross-origin attacks.

In ASGI, the server handles this handshake. Your application just receives a websocket scope and sends a websocket.accept event.

The Simplest WebSocket App

async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        await handle_lifespan(receive, send)
    elif scope["type"] == "http":
        await handle_http(scope, receive, send)
    elif scope["type"] == "websocket":
        await handle_websocket(scope, receive, send)


async def handle_websocket(scope, receive, send):
    """Echo server: sends back whatever the client sends."""

    # Wait for the connection event
    event = await receive()
    assert event["type"] == "websocket.connect"

    # Accept the connection
    await send({"type": "websocket.accept"})

    # Echo loop
    while True:
        event = await receive()

        if event["type"] == "websocket.receive":
            # Echo the message back
            if event.get("text"):
                await send({
                    "type": "websocket.send",
                    "text": f"Echo: {event['text']}",
                })
            elif event.get("bytes"):
                await send({
                    "type": "websocket.send",
                    "bytes": event["bytes"],
                })

        elif event["type"] == "websocket.disconnect":
            # Client disconnected — exit the handler
            break

Run it:

uvicorn ws_app:application

Test with a WebSocket client. Python’s websockets library works well:

# test_ws.py
import asyncio
import websockets


async def test():
    async with websockets.connect("ws://localhost:8000/ws") as ws:
        await ws.send("Hello, WebSocket!")
        response = await ws.recv()
        print(response)  # Echo: Hello, WebSocket!


asyncio.run(test())

Or from the browser console:

const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello from browser");
// Echo: Hello from browser

A Real Example: Chat Room

An echo server demonstrates the protocol. A chat room demonstrates why you’d actually use it.

import asyncio
import json
from typing import Dict, Set


# Connected clients: room_name → set of send callables
rooms: Dict[str, Set] = {}


async def handle_websocket(scope, receive, send):
    """Multi-room chat server."""
    # Get room from query string
    query = scope.get("query_string", b"").decode()
    room = "general"
    for param in query.split("&"):
        if param.startswith("room="):
            room = param[5:]
            break

    # Join the room
    if room not in rooms:
        rooms[room] = set()
    rooms[room].add(send)

    try:
        # Accept the WebSocket connection
        event = await receive()
        if event["type"] != "websocket.connect":
            return
        await send({"type": "websocket.accept"})

        # Announce arrival
        await broadcast(room, {
            "type": "system",
            "message": f"A user joined #{room}",
        }, exclude=send)

        # Message loop
        while True:
            event = await receive()

            if event["type"] == "websocket.receive":
                text = event.get("text", "")
                if not text:
                    continue

                try:
                    data = json.loads(text)
                except json.JSONDecodeError:
                    data = {"text": text}

                # Broadcast to everyone in the room
                await broadcast(room, {
                    "type": "message",
                    "text": data.get("text", text),
                    "room": room,
                })

            elif event["type"] == "websocket.disconnect":
                break

    finally:
        # Remove from room on disconnect
        rooms.get(room, set()).discard(send)
        await broadcast(room, {
            "type": "system",
            "message": f"A user left #{room}",
        })


async def broadcast(room: str, data: dict, exclude=None) -> None:
    """Send a message to all clients in a room."""
    message = json.dumps(data)
    dead_clients = set()

    for client_send in list(rooms.get(room, set())):
        if client_send is exclude:
            continue
        try:
            await client_send({
                "type": "websocket.send",
                "text": message,
            })
        except Exception:
            dead_clients.add(client_send)

    # Clean up dead clients
    rooms.get(room, set()).difference_update(dead_clients)

Test with multiple connections:

# multi_client_test.py
import asyncio
import json
import websockets


async def client(name: str, room: str = "general"):
    async with websockets.connect(f"ws://localhost:8000/ws?room={room}") as ws:
        print(f"{name} connected")

        # Send a message
        await ws.send(json.dumps({"text": f"Hello from {name}"}))

        # Receive messages for 2 seconds
        async def receive_loop():
            while True:
                msg = await ws.recv()
                data = json.loads(msg)
                print(f"{name} received: {data}")

        try:
            await asyncio.wait_for(receive_loop(), timeout=2.0)
        except asyncio.TimeoutError:
            pass


async def main():
    # Connect three clients concurrently
    await asyncio.gather(
        client("Alice"),
        client("Bob"),
        client("Carol"),
    )


asyncio.run(main())

Rejecting WebSocket Connections

Sometimes you need to reject a WebSocket connection — bad authentication, rate limit exceeded, room full:

async def handle_websocket(scope, receive, send):
    # Wait for connect event
    event = await receive()
    assert event["type"] == "websocket.connect"

    # Check authentication
    token = get_token_from_scope(scope)
    if not await is_valid_token(token):
        # Reject the connection with a close code
        await send({
            "type": "websocket.close",
            "code": 4001,  # Application-defined code (4000-4999 are custom)
        })
        return

    # Accept and proceed
    await send({"type": "websocket.accept"})
    # ...


def get_token_from_scope(scope) -> str:
    """Extract Bearer token from WebSocket upgrade headers."""
    for name, value in scope.get("headers", []):
        if name == b"authorization":
            auth = value.decode("latin-1")
            if auth.startswith("Bearer "):
                return auth[7:]
    # Also check query string
    query = scope.get("query_string", b"").decode()
    for param in query.split("&"):
        if param.startswith("token="):
            return param[6:]
    return ""


async def is_valid_token(token: str) -> bool:
    # Check against database, JWT validation, etc.
    return token == "valid-token"  # Simplified

WebSocket close codes follow the RFC 6455 spec:

  • 1000 — Normal closure
  • 1001 — Going away (server shutdown)
  • 1002 — Protocol error
  • 1003 — Unsupported data type
  • 4000-4999 — Application-defined (use these for your own status codes)

Sending Binary Data

WebSockets support both text frames (UTF-8 encoded) and binary frames. Use binary for protocol buffers, binary file transfers, or any non-text data:

async def binary_echo(scope, receive, send):
    event = await receive()
    assert event["type"] == "websocket.connect"
    await send({"type": "websocket.accept"})

    while True:
        event = await receive()
        if event["type"] == "websocket.receive":
            if event.get("bytes") is not None:
                # Echo binary data back
                await send({
                    "type": "websocket.send",
                    "bytes": event["bytes"],
                })
            elif event.get("text") is not None:
                # Convert text to binary (example)
                await send({
                    "type": "websocket.send",
                    "bytes": event["text"].encode("utf-8"),
                })
        elif event["type"] == "websocket.disconnect":
            break

Handling Both HTTP and WebSocket

Real applications serve regular HTTP endpoints and WebSocket endpoints. Wire them together:

async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        await handle_lifespan(receive, send)
    elif scope["type"] == "http":
        await handle_http(scope, receive, send)
    elif scope["type"] == "websocket":
        path = scope["path"]
        if path == "/ws/chat":
            await handle_websocket(scope, receive, send)
        elif path == "/ws/echo":
            await echo_websocket(scope, receive, send)
        else:
            # Reject unknown WebSocket paths
            event = await receive()
            if event["type"] == "websocket.connect":
                await send({"type": "websocket.close", "code": 4004})
    else:
        pass  # Unknown scope type, ignore


async def handle_http(scope, receive, send):
    await read_body(receive)
    path = scope["path"]

    if path == "/":
        # Serve a simple HTML page with a WebSocket client
        html = """
        <!DOCTYPE html>
        <html>
        <head><title>WebSocket Chat</title></head>
        <body>
            <input id="msg" placeholder="Message..." />
            <button onclick="send()">Send</button>
            <ul id="log"></ul>
            <script>
                const ws = new WebSocket("ws://" + location.host + "/ws/chat");
                ws.onmessage = e => {
                    const li = document.createElement("li");
                    li.textContent = e.data;
                    document.getElementById("log").appendChild(li);
                };
                function send() {
                    const input = document.getElementById("msg");
                    ws.send(JSON.stringify({text: input.value}));
                    input.value = "";
                }
            </script>
        </body>
        </html>
        """.encode("utf-8")

        await send({
            "type": "http.response.start",
            "status": 200,
            "headers": [
                (b"content-type", b"text/html; charset=utf-8"),
                (b"content-length", str(len(html)).encode()),
            ],
        })
        await send({
            "type": "http.response.body",
            "body": html,
            "more_body": False,
        })
    else:
        body = b"Not Found"
        await send({
            "type": "http.response.start",
            "status": 404,
            "headers": [(b"content-type", b"text/plain"),
                        (b"content-length", str(len(body)).encode())],
        })
        await send({"type": "http.response.body", "body": body, "more_body": False})

Save, run with uvicorn, open http://localhost:8000 in a browser. Open multiple tabs — they’re all in the same chat room.

The Concurrency Model

When two WebSocket clients are connected, their handler coroutines run concurrently on the event loop:

event loop:
    coroutine A (client 1): waiting at "await receive()"
    coroutine B (client 2): waiting at "await receive()"

Client 1 sends a message:
    → coroutine A wakes up
    → calls broadcast()
    → await client_2_send(...)  # sends to client 2
    → coroutine A goes back to "await receive()"

Client 2 sends a message:
    → coroutine B wakes up
    → calls broadcast()
    → await client_1_send(...)  # sends to client 1
    → coroutine B goes back to "await receive()"

Each handler runs independently, cooperatively yielding at await points. This is why our broadcast works without locks or synchronization: the event loop is single-threaded, so only one coroutine runs at a time. There’s no race condition.

This breaks down if you have CPU-bound work in a handler (use run_in_executor) or if you use actual threads (use asyncio.Lock). But for pure async I/O, the cooperative model is both safe and efficient.

What Frameworks Add

Starlette provides a WebSocket class that wraps the ASGI scope and adds convenience methods:

from starlette.websockets import WebSocket

async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        text = await websocket.receive_text()  # or receive_bytes(), receive_json()
        await websocket.send_text(f"Echo: {text}")

FastAPI builds on Starlette’s WebSocket with dependency injection:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await websocket.send_text("Connected!")
    # ...

Under the hood, websocket.accept() sends {"type": "websocket.accept"}, websocket.receive_text() awaits receive() and extracts the text field, and websocket.send_text() sends {"type": "websocket.send", "text": ...}.

The raw events are still there. The framework just provides a nicer interface to them.

ASGI Middleware Deep Dive

ASGI middleware follows the same pattern as WSGI middleware: a callable that takes an app and returns an app. The difference is that everything is async, and the interface is (scope, receive, send) instead of (environ, start_response).

The async nature introduces some subtleties worth understanding.

The Simplest ASGI Middleware

class DoNothingMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        await self.app(scope, receive, send)

Or as a function:

def do_nothing_middleware(app):
    async def wrapper(scope, receive, send):
        await self.app(scope, receive, send)
    return wrapper

Both are equivalent. Classes are slightly more common in ASGI middleware because they can hold configuration cleanly.

Intercepting Requests and Responses

In WSGI, you intercept responses by wrapping start_response. In ASGI, you intercept them by wrapping send:

class TimingMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        import time
        started = time.monotonic()
        status_holder = []

        async def send_with_timing(event):
            if event["type"] == "http.response.start":
                status_holder.append(event["status"])
            elif event["type"] == "http.response.body":
                if not event.get("more_body", False):
                    elapsed = (time.monotonic() - started) * 1000
                    method = scope.get("method", "?")
                    path = scope.get("path", "/")
                    status = status_holder[0] if status_holder else "?"
                    print(f"{method} {path} → {status} ({elapsed:.1f}ms)")
            await send(event)

        await self.app(scope, receive, send_with_timing)

The send_with_timing coroutine wraps send just like we wrapped start_response in WSGI. It intercepts the http.response.start event to capture the status code and the final http.response.body event to measure total time, then passes everything through.

Modifying Requests

Intercept receive to modify incoming data:

class RequestIDMiddleware:
    """Add a unique request ID to every request."""

    def __init__(self, app, header: str = "X-Request-ID"):
        self.app = app
        self.header = header.lower().encode("latin-1")

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        import uuid

        # Check for existing ID, generate one if missing
        existing_id = None
        for name, value in scope.get("headers", []):
            if name == self.header:
                existing_id = value.decode("latin-1")
                break

        request_id = existing_id or str(uuid.uuid4())

        # Inject into scope
        scope = dict(scope)
        headers = list(scope.get("headers", []))
        # Update or add the header
        new_headers = [(n, v) for n, v in headers if n != self.header]
        new_headers.append((self.header, request_id.encode("latin-1")))
        scope["headers"] = new_headers
        scope["request_id"] = request_id

        # Add request ID to response headers
        async def send_with_id(event):
            if event["type"] == "http.response.start":
                event = dict(event)
                event["headers"] = list(event.get("headers", [])) + [
                    (self.header, request_id.encode("latin-1"))
                ]
            await send(event)

        await self.app(scope, receive, send_with_id)

Authentication Middleware

A complete authentication middleware that short-circuits the request if the token is invalid:

import json
from typing import Optional


class BearerAuthMiddleware:
    """
    Validates Bearer tokens.
    Injects user information into scope["user"] if valid.
    Returns 401 if token is missing or invalid.
    Skips authentication for paths in exclude_paths.
    """

    def __init__(
        self,
        app,
        verify_token,  # async callable: token → user dict or None
        exclude_paths: list = None,
    ):
        self.app = app
        self.verify_token = verify_token
        self.exclude_paths = set(exclude_paths or [])

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        path = scope.get("path", "/")

        # Skip auth for excluded paths
        if path in self.exclude_paths:
            await self.app(scope, receive, send)
            return

        # Extract Bearer token
        token = self._extract_token(scope)

        if token is None:
            await self._send_401(send, "Missing Authorization header")
            return

        # Verify the token
        user = await self.verify_token(token)

        if user is None:
            await self._send_401(send, "Invalid or expired token")
            return

        # Inject user into scope
        scope = dict(scope)
        scope["user"] = user

        await self.app(scope, receive, send)

    def _extract_token(self, scope) -> Optional[str]:
        for name, value in scope.get("headers", []):
            if name == b"authorization":
                auth = value.decode("latin-1")
                if auth.startswith("Bearer "):
                    return auth[7:]
        return None

    async def _send_401(self, send, message: str) -> None:
        body = json.dumps({"error": message}).encode("utf-8")
        await send({
            "type": "http.response.start",
            "status": 401,
            "headers": [
                (b"content-type", b"application/json"),
                (b"content-length", str(len(body)).encode()),
                (b"www-authenticate", b'Bearer realm="API"'),
            ],
        })
        await send({
            "type": "http.response.body",
            "body": body,
            "more_body": False,
        })

Usage:

async def verify_token(token: str):
    # Check your database or JWT
    if token == "valid-token":
        return {"id": 1, "username": "alice", "role": "admin"}
    return None


app = BearerAuthMiddleware(
    my_app,
    verify_token=verify_token,
    exclude_paths=["/health", "/login"],
)

The GZIP Compression Middleware

A real-world example with non-trivial response manipulation:

import gzip
import io


class GZipMiddleware:
    """
    Compress responses with gzip when:
    - Client sends Accept-Encoding: gzip
    - Response content type is compressible (text, JSON, etc.)
    - Response body is above minimum size threshold
    """

    COMPRESSIBLE_TYPES = {
        "text/html", "text/plain", "text/css", "text/javascript",
        "application/json", "application/javascript",
        "application/xml", "image/svg+xml",
    }

    def __init__(self, app, minimum_size: int = 500, compresslevel: int = 6):
        self.app = app
        self.minimum_size = minimum_size
        self.compresslevel = compresslevel

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        # Check if client accepts gzip
        accepts_gzip = False
        for name, value in scope.get("headers", []):
            if name == b"accept-encoding":
                accepts_gzip = b"gzip" in value
                break

        if not accepts_gzip:
            await self.app(scope, receive, send)
            return

        # We need to collect the full response to compress it
        response_started = []
        body_chunks = []

        async def collecting_send(event):
            if event["type"] == "http.response.start":
                response_started.append(event)
            elif event["type"] == "http.response.body":
                body_chunks.append(event.get("body", b""))
                if not event.get("more_body", False):
                    # All chunks collected — decide whether to compress
                    full_body = b"".join(body_chunks)
                    start_event = response_started[0]

                    content_type = ""
                    for name, value in start_event.get("headers", []):
                        if name == b"content-type":
                            content_type = value.decode("latin-1").split(";")[0].strip()
                            break

                    should_compress = (
                        len(full_body) >= self.minimum_size
                        and content_type in self.COMPRESSIBLE_TYPES
                    )

                    if should_compress:
                        compressed = gzip.compress(full_body,
                                                   compresslevel=self.compresslevel)

                        # Rebuild headers with encoding and new length
                        headers = [
                            (n, v) for n, v in start_event.get("headers", [])
                            if n not in (b"content-length", b"content-encoding")
                        ]
                        headers.append((b"content-encoding", b"gzip"))
                        headers.append((b"content-length",
                                        str(len(compressed)).encode()))

                        await send(dict(start_event, headers=headers))
                        await send({
                            "type": "http.response.body",
                            "body": compressed,
                            "more_body": False,
                        })
                    else:
                        # Send as-is
                        await send(start_event)
                        await send({
                            "type": "http.response.body",
                            "body": full_body,
                            "more_body": False,
                        })

        await self.app(scope, receive, collecting_send)

Note the limitation: we’re collecting the entire response body before deciding whether to compress. For large streaming responses, this defeats the purpose of streaming. A streaming-compatible compressor would use more_body=True to send compressed chunks incrementally — at the cost of significantly more complexity.

Building a Middleware Stack

from functools import reduce
from typing import Callable, List


def build_middleware_stack(app: Callable, middleware: List) -> Callable:
    """
    Build an ASGI middleware stack.
    middleware = [A, B, C] → A(B(C(app)))
    Request order: A → B → C → app
    """
    for mw in reversed(middleware):
        if callable(mw) and not isinstance(mw, type):
            # It's a factory function, call it
            app = mw(app)
        elif isinstance(mw, type):
            # It's a class, instantiate it
            app = mw(app)
        else:
            # It's already an instance with __call__
            app = mw
    return app


# Usage
async def verify_token(token: str):
    return {"id": 1} if token == "secret" else None


application = build_middleware_stack(my_app, [
    TimingMiddleware,
    RequestIDMiddleware,
    GZipMiddleware,
    lambda app: BearerAuthMiddleware(app, verify_token=verify_token,
                                      exclude_paths=["/health"]),
])

Middleware That Handles Lifespan

If your middleware needs its own startup/shutdown (e.g., opening a connection), handle the lifespan scope:

class DatabaseMiddleware:
    """
    Opens a database connection pool at startup,
    injects it into each request's scope.
    """

    def __init__(self, app, database_url: str):
        self.app = app
        self.database_url = database_url
        self.pool = None

    async def __call__(self, scope, receive, send):
        if scope["type"] == "lifespan":
            await self._handle_lifespan(scope, receive, send)
            return

        # Inject pool into scope
        scope = dict(scope)
        scope["db"] = self.pool
        await self.app(scope, receive, send)

    async def _handle_lifespan(self, scope, receive, send):
        while True:
            event = await receive()
            if event["type"] == "lifespan.startup":
                self.pool = await self._connect()
                await self.app(scope, receive, send)  # Forward to inner app
                # Note: inner app handles the rest of lifespan
                return
            elif event["type"] == "lifespan.shutdown":
                if self.pool:
                    await self.pool.close()
                await send({"type": "lifespan.shutdown.complete"})
                return

    async def _connect(self):
        # asyncpg.create_pool(self.database_url), etc.
        print(f"Connected to {self.database_url}")
        return {"url": self.database_url, "status": "connected"}

This pattern — middleware that intercepts lifespan and injects resources into HTTP scopes — is how Starlette’s SessionMiddleware, database integrations, and other resource-managing middleware work.

Testing Middleware

Testing ASGI middleware directly, without a framework:

import asyncio
import io


class MockASGI:
    """A fake ASGI server for testing middleware."""

    def __init__(self):
        self.received_events = []

    async def make_request(
        self,
        app,
        method: str = "GET",
        path: str = "/",
        headers: list = None,
        body: bytes = b"",
    ) -> tuple:
        scope = {
            "type": "http",
            "method": method,
            "path": path,
            "query_string": b"",
            "headers": headers or [],
            "server": ("testserver", 80),
        }

        request_events = [
            {"type": "http.request", "body": body, "more_body": False}
        ]
        event_index = [0]

        async def receive():
            idx = event_index[0]
            event_index[0] += 1
            if idx < len(request_events):
                return request_events[idx]
            return {"type": "http.disconnect"}

        response_events = []

        async def send(event):
            response_events.append(event)

        await app(scope, receive, send)

        return scope, response_events


async def test_timing_middleware():
    async def simple_app(scope, receive, send):
        await receive()
        await send({
            "type": "http.response.start",
            "status": 200,
            "headers": [(b"content-type", b"text/plain")],
        })
        await send({
            "type": "http.response.body",
            "body": b"OK",
            "more_body": False,
        })

    app = TimingMiddleware(simple_app)
    mock = MockASGI()
    scope, events = await mock.make_request(app, "GET", "/test")

    assert events[0]["type"] == "http.response.start"
    assert events[0]["status"] == 200
    assert events[1]["body"] == b"OK"
    print("TimingMiddleware test passed")


asyncio.run(test_timing_middleware())

The Key Difference from WSGI Middleware

In WSGI middleware, you intercept start_response (a synchronous callable) to capture the status and headers before forwarding them. In ASGI middleware, you intercept the async send callable to capture http.response.start events.

The conceptual model is identical — wrap the callable, inspect and possibly modify events passing through — but the async nature means everything must be awaited. This is also why ASGI middleware can do things WSGI middleware can’t: it can await during send, which means it can do async work (database calls, cache writes) as part of processing the response.

That’s either powerful or terrifying, depending on how carefully your middleware is written.

Testing WSGI and ASGI Apps Without a Framework

Your application is a callable. Test it like one.

This is the payoff for understanding the interface directly: testing becomes a matter of calling your app with the right arguments and inspecting the result. No special test client required — though they’re convenient, and we’ll look at those too.

Testing WSGI Applications

A WSGI application takes environ and start_response. To test it, provide both:

import io
import json
from typing import Any, Dict, List, Optional, Tuple


def make_environ(
    method: str = "GET",
    path: str = "/",
    query_string: str = "",
    body: bytes = b"",
    content_type: str = "",
    headers: Optional[Dict[str, str]] = None,
    environ_overrides: Optional[Dict] = None,
) -> dict:
    """Build a WSGI environ dict for testing."""
    environ = {
        "REQUEST_METHOD": method.upper(),
        "PATH_INFO": path,
        "QUERY_STRING": query_string,
        "CONTENT_TYPE": content_type,
        "CONTENT_LENGTH": str(len(body)) if body else "",
        "SERVER_NAME": "testserver",
        "SERVER_PORT": "80",
        "HTTP_HOST": "testserver",
        "wsgi.input": io.BytesIO(body),
        "wsgi.errors": io.StringIO(),
        "wsgi.url_scheme": "http",
        "wsgi.version": (1, 0),
        "wsgi.multithread": False,
        "wsgi.multiprocess": False,
        "wsgi.run_once": False,
        "GATEWAY_INTERFACE": "CGI/1.1",
        "SERVER_PROTOCOL": "HTTP/1.1",
    }

    # Add custom headers
    if headers:
        for name, value in headers.items():
            key = "HTTP_" + name.upper().replace("-", "_")
            environ[key] = value

    if environ_overrides:
        environ.update(environ_overrides)

    return environ


class WSGITestResponse:
    """Holds the result of calling a WSGI application."""

    def __init__(self, status: str, headers: List[Tuple[str, str]], body: bytes):
        self.status = status
        self.status_code = int(status.split(" ")[0])
        self.headers = dict(headers)
        self.body = body

    @property
    def text(self) -> str:
        return self.body.decode("utf-8")

    def json(self) -> Any:
        return json.loads(self.body)

    def __repr__(self) -> str:
        return f"<WSGITestResponse {self.status}>"


def call_wsgi(app, environ: dict) -> WSGITestResponse:
    """Call a WSGI app and return a response object."""
    response_parts = []

    def start_response(status, headers, exc_info=None):
        if exc_info:
            raise exc_info[1].with_traceback(exc_info[2])
        response_parts.append((status, headers))

    result = app(environ, start_response)
    try:
        body = b"".join(result)
    finally:
        if hasattr(result, "close"):
            result.close()

    status, headers = response_parts[0]
    return WSGITestResponse(status, headers, body)


class WSGITestClient:
    """A test client for WSGI applications."""

    def __init__(self, app):
        self.app = app

    def get(self, path: str, **kwargs) -> WSGITestResponse:
        return self.request("GET", path, **kwargs)

    def post(self, path: str, **kwargs) -> WSGITestResponse:
        return self.request("POST", path, **kwargs)

    def put(self, path: str, **kwargs) -> WSGITestResponse:
        return self.request("PUT", path, **kwargs)

    def delete(self, path: str, **kwargs) -> WSGITestResponse:
        return self.request("DELETE", path, **kwargs)

    def request(
        self,
        method: str,
        path: str,
        body: bytes = b"",
        json: Any = None,
        headers: Optional[Dict[str, str]] = None,
        query_string: str = "",
    ) -> WSGITestResponse:
        content_type = ""
        if json is not None:
            import json as json_module
            body = json_module.dumps(json).encode("utf-8")
            content_type = "application/json"

        environ = make_environ(
            method=method,
            path=path,
            query_string=query_string,
            body=body,
            content_type=content_type,
            headers=headers,
        )
        return call_wsgi(self.app, environ)

Writing WSGI Tests

# test_wsgi_tasks.py
from tasks_app import application  # The tasks app from chapter 5


client = WSGITestClient(application)


def test_empty_task_list():
    response = client.get("/tasks")
    assert response.status_code == 200
    assert response.json() == []


def test_create_task():
    response = client.post("/tasks", json={"title": "Write tests"})
    assert response.status_code == 201
    data = response.json()
    assert data["title"] == "Write tests"
    assert data["done"] is False
    assert "id" in data
    return data["id"]


def test_get_task():
    # Create a task first
    task_id = test_create_task()

    response = client.get(f"/tasks/{task_id}")
    assert response.status_code == 200
    assert response.json()["id"] == task_id


def test_task_not_found():
    response = client.get("/tasks/does-not-exist")
    assert response.status_code == 404


def test_delete_task():
    task_id = test_create_task()

    response = client.delete(f"/tasks/{task_id}")
    assert response.status_code == 200

    response = client.get(f"/tasks/{task_id}")
    assert response.status_code == 404


def test_missing_title():
    response = client.post("/tasks", json={"description": "no title here"})
    assert response.status_code == 400
    assert "title" in response.json()["error"]


def test_wrong_method():
    response = client.request("PATCH", "/tasks")
    assert response.status_code == 405


if __name__ == "__main__":
    test_empty_task_list()
    test_create_task()
    test_get_task()
    test_task_not_found()
    test_delete_task()
    test_missing_title()
    test_wrong_method()
    print("All WSGI tests passed.")

Run with pytest (pip install pytest) or directly:

pytest test_wsgi_tasks.py -v
# or
python test_wsgi_tasks.py

Testing ASGI Applications

ASGI apps are async, so tests need to be async too. Python’s asyncio makes this straightforward:

import asyncio
import io
import json
from typing import Any, Dict, List, Optional


def make_scope(
    method: str = "GET",
    path: str = "/",
    query_string: bytes = b"",
    headers: Optional[List] = None,
    scope_type: str = "http",
) -> dict:
    """Build an ASGI HTTP scope dict for testing."""
    return {
        "type": scope_type,
        "asgi": {"version": "3.0"},
        "http_version": "1.1",
        "method": method.upper(),
        "path": path,
        "raw_path": path.encode("latin-1"),
        "query_string": query_string,
        "root_path": "",
        "scheme": "http",
        "headers": headers or [],
        "server": ("testserver", 80),
        "client": ("127.0.0.1", 12345),
    }


class ASGITestResponse:
    """Holds the result of calling an ASGI application."""

    def __init__(self, status: int, headers: List, body: bytes):
        self.status_code = status
        self._headers = headers
        self.headers = {
            k.decode("latin-1"): v.decode("latin-1")
            for k, v in headers
        }
        self.body = body

    @property
    def text(self) -> str:
        return self.body.decode("utf-8")

    def json(self) -> Any:
        return json.loads(self.body)

    def __repr__(self) -> str:
        return f"<ASGITestResponse {self.status_code}>"


async def call_asgi(
    app,
    scope: dict,
    body: bytes = b"",
) -> ASGITestResponse:
    """Call an ASGI app with an HTTP scope and return a response."""
    request_events = [
        {"type": "http.request", "body": body, "more_body": False}
    ]
    event_index = [0]

    async def receive():
        idx = event_index[0]
        event_index[0] += 1
        if idx < len(request_events):
            return request_events[idx]
        return {"type": "http.disconnect"}

    response_events = []

    async def send(event):
        response_events.append(event)

    await app(scope, receive, send)

    start = next(e for e in response_events if e["type"] == "http.response.start")
    body_chunks = [
        e.get("body", b"")
        for e in response_events
        if e["type"] == "http.response.body"
    ]

    return ASGITestResponse(
        status=start["status"],
        headers=start.get("headers", []),
        body=b"".join(body_chunks),
    )


class ASGITestClient:
    """A test client for ASGI applications."""

    def __init__(self, app):
        self.app = app
        self._started = False

    async def _ensure_started(self):
        """Send lifespan startup if not already done."""
        if self._started:
            return
        self._started = True

        scope = {"type": "lifespan", "asgi": {"version": "3.0"}}
        events = asyncio.Queue()
        startup_done = asyncio.Event()

        await events.put({"type": "lifespan.startup"})

        async def receive():
            return await events.get()

        async def send(event):
            if event["type"] == "lifespan.startup.complete":
                startup_done.set()

        asyncio.create_task(self.app(scope, receive, send))
        try:
            await asyncio.wait_for(startup_done.wait(), timeout=5.0)
        except asyncio.TimeoutError:
            pass  # App may not handle lifespan

    async def get(self, path: str, **kwargs) -> ASGITestResponse:
        return await self.request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs) -> ASGITestResponse:
        return await self.request("POST", path, **kwargs)

    async def delete(self, path: str, **kwargs) -> ASGITestResponse:
        return await self.request("DELETE", path, **kwargs)

    async def patch(self, path: str, **kwargs) -> ASGITestResponse:
        return await self.request("PATCH", path, **kwargs)

    async def request(
        self,
        method: str,
        path: str,
        body: bytes = b"",
        json: Any = None,
        headers: Optional[Dict[str, str]] = None,
        query_string: bytes = b"",
    ) -> ASGITestResponse:
        await self._ensure_started()

        content_type = ""
        if json is not None:
            import json as json_module
            body = json_module.dumps(json).encode("utf-8")
            content_type = "application/json"

        raw_headers = []
        if content_type:
            raw_headers.append((b"content-type", content_type.encode()))
        if body:
            raw_headers.append((b"content-length", str(len(body)).encode()))
        if headers:
            for name, value in headers.items():
                raw_headers.append((
                    name.lower().encode("latin-1"),
                    value.encode("latin-1"),
                ))

        scope = make_scope(
            method=method,
            path=path,
            query_string=query_string,
            headers=raw_headers,
        )

        return await call_asgi(self.app, scope, body)

Writing ASGI Tests

# test_asgi_tasks.py
import asyncio
from asgi_tasks import application  # From ASGI chapter


client = ASGITestClient(application)


async def test_list_tasks_empty():
    response = await client.get("/tasks")
    assert response.status_code == 200
    assert response.json() == []


async def test_create_and_get_task():
    # Create
    response = await client.post("/tasks", json={"title": "Async task"})
    assert response.status_code == 201
    task = response.json()
    assert task["title"] == "Async task"
    task_id = task["id"]

    # Get
    response = await client.get(f"/tasks/{task_id}")
    assert response.status_code == 200
    assert response.json()["id"] == task_id


async def test_update_task():
    # Create
    response = await client.post("/tasks", json={"title": "To update"})
    task_id = response.json()["id"]

    # Update
    response = await client.patch(f"/tasks/{task_id}", json={"done": True})
    assert response.status_code == 200
    assert response.json()["done"] is True


async def test_content_type_required():
    response = await client.request(
        "POST", "/tasks",
        body=b'{"title": "oops"}',
        # No content-type header
    )
    assert response.status_code == 415


async def run_all_tests():
    await test_list_tasks_empty()
    print("✓ list tasks empty")
    await test_create_and_get_task()
    print("✓ create and get task")
    await test_update_task()
    print("✓ update task")
    await test_content_type_required()
    print("✓ content type required")
    print("All ASGI tests passed.")


if __name__ == "__main__":
    asyncio.run(run_all_tests())

Using pytest-asyncio

For proper async test suites, use pytest-asyncio:

pip install pytest pytest-asyncio
# conftest.py
import pytest
from asgi_tasks import application
from test_helpers import ASGITestClient


@pytest.fixture
def client():
    return ASGITestClient(application)
# test_asgi_tasks.py
import pytest


@pytest.mark.asyncio
async def test_create_task(client):
    response = await client.post("/tasks", json={"title": "pytest task"})
    assert response.status_code == 201
    assert response.json()["title"] == "pytest task"


@pytest.mark.asyncio
async def test_delete_task(client):
    create_response = await client.post("/tasks", json={"title": "to delete"})
    task_id = create_response.json()["id"]

    delete_response = await client.delete(f"/tasks/{task_id}")
    assert delete_response.status_code == 200

    get_response = await client.get(f"/tasks/{task_id}")
    assert get_response.status_code == 404
pytest test_asgi_tasks.py -v

Testing WebSocket Handlers

WebSocket testing requires simulating the connect/message/disconnect lifecycle:

async def test_websocket_echo():
    from asgi_websockets import application  # WebSocket echo app

    scope = {
        "type": "websocket",
        "asgi": {"version": "3.0"},
        "path": "/ws",
        "query_string": b"",
        "headers": [],
        "server": ("testserver", 8000),
        "client": ("127.0.0.1", 9999),
        "subprotocols": [],
    }

    # Queue of events the app will receive
    incoming = asyncio.Queue()
    outgoing = []

    async def receive():
        return await incoming.get()

    async def send(event):
        outgoing.append(event)

    # Start the handler
    handler_task = asyncio.create_task(application(scope, receive, send))

    # Simulate WebSocket connect
    await incoming.put({"type": "websocket.connect"})
    await asyncio.sleep(0)  # Let the handler process it

    # Check accept was sent
    assert outgoing[-1]["type"] == "websocket.accept"

    # Send a message
    await incoming.put({"type": "websocket.receive", "text": "hello"})
    await asyncio.sleep(0)

    # Check echo
    echo_event = outgoing[-1]
    assert echo_event["type"] == "websocket.send"
    assert "hello" in echo_event.get("text", "")

    # Disconnect
    await incoming.put({"type": "websocket.disconnect", "code": 1000})
    await handler_task


asyncio.run(test_websocket_echo())

Testing Middleware

Test middleware by composing it with a simple app:

async def test_timing_middleware():
    import time

    timing_logs = []

    class CapturingTimingMiddleware(TimingMiddleware):
        async def __call__(self, scope, receive, send):
            # Override to capture instead of print
            started = time.monotonic()
            status_holder = []

            async def capturing_send(event):
                if event["type"] == "http.response.start":
                    status_holder.append(event["status"])
                elif event["type"] == "http.response.body":
                    if not event.get("more_body", False):
                        elapsed = (time.monotonic() - started) * 1000
                        timing_logs.append({
                            "path": scope.get("path"),
                            "status": status_holder[0] if status_holder else None,
                            "elapsed_ms": elapsed,
                        })
                await send(event)

            await self.app(scope, receive, capturing_send)

    async def simple_app(scope, receive, send):
        await receive()
        await send({
            "type": "http.response.start",
            "status": 200,
            "headers": [(b"content-type", b"text/plain")],
        })
        await send({
            "type": "http.response.body",
            "body": b"OK",
            "more_body": False,
        })

    app = CapturingTimingMiddleware(simple_app)
    test_client = ASGITestClient(app)

    await test_client.get("/test-path")

    assert len(timing_logs) == 1
    assert timing_logs[0]["path"] == "/test-path"
    assert timing_logs[0]["status"] == 200
    assert timing_logs[0]["elapsed_ms"] >= 0
    print("Timing middleware test passed")


asyncio.run(test_timing_middleware())

The httpx Transport Approach

The httpx library (an async HTTP client) has a built-in ASGI transport that lets you test ASGI apps with a full HTTP client interface:

pip install httpx
import httpx


async def test_with_httpx():
    from asgi_tasks import application

    async with httpx.AsyncClient(
        transport=httpx.ASGITransport(app=application),
        base_url="http://testserver",
    ) as client:
        response = await client.post(
            "/tasks",
            json={"title": "httpx task"},
        )
        assert response.status_code == 201
        task = response.json()
        assert task["title"] == "httpx task"

        # httpx handles cookies, redirects, etc. automatically
        response = await client.get(f"/tasks/{task['id']}")
        assert response.status_code == 200


asyncio.run(test_with_httpx())

httpx.ASGITransport calls your ASGI app directly, without a network socket. It’s the same mechanism we built manually — it constructs scopes, calls receive with request body events, and collects the response from send events — but packaged as a transport that the full httpx client can use.

This gives you all of httpx’s conveniences (cookie handling, redirects, JSON serialization) while testing without a server.

The Principle

Testing WSGI/ASGI apps without a framework is fast (no server startup), correct (you’re calling the exact same code path as production), and instructive (you understand exactly what’s happening).

The test client libraries — pytest-django, Starlette’s TestClient, FastAPI’s TestClient, httpx — are all doing exactly what we’ve done here: constructing the right environ/scope, calling your app, and wrapping the result.

When a test fails mysteriously, knowing that your test client is just calling app(scope, receive, send) gives you a concrete place to start debugging.

Roll Your Own Mini-Framework (For Fun and Understanding)

We have all the pieces. We’ve built a WSGI server, a router, middleware, and request/response objects. We’ve done the same for ASGI. Now let’s assemble them into something coherent: a small, complete framework that you’d actually consider using for a personal project.

This chapter is about synthesis. The goal isn’t to compete with Flask or Starlette — it’s to make the jump from “I’ve built components” to “I understand how a framework is structured.”

What We’re Building

A minimal ASGI framework called bare (fitting) with:

  • Route registration via decorators
  • Request and Response classes
  • Middleware composition
  • Lifespan event handling
  • WebSocket support
  • Zero dependencies beyond Python’s standard library (plus uvicorn to run it)

The finished product will be small enough to fit in a single file and correct enough to run real applications.

The Core Application Class

# bare.py
import asyncio
import json
import re
import urllib.parse
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple


# ── Types ─────────────────────────────────────────────────────────────────────

Headers = List[Tuple[bytes, bytes]]
ASGIApp = Callable  # (scope, receive, send) -> None


# ── Request ───────────────────────────────────────────────────────────────────

class Request:
    def __init__(self, scope: dict, receive: Callable):
        self._scope = scope
        self._receive = receive
        self._body: Optional[bytes] = None

    @property
    def method(self) -> str:
        return self._scope["method"]

    @property
    def path(self) -> str:
        return self._scope["path"]

    @property
    def query_string(self) -> str:
        return self._scope.get("query_string", b"").decode("latin-1")

    @property
    def headers(self) -> Dict[str, str]:
        return {
            k.decode("latin-1"): v.decode("latin-1")
            for k, v in self._scope.get("headers", [])
        }

    def header(self, name: str, default: str = "") -> str:
        return self.headers.get(name.lower(), default)

    @property
    def content_type(self) -> str:
        return self.header("content-type")

    @property
    def path_params(self) -> Dict[str, str]:
        return self._scope.get("path_params", {})

    @property
    def query_params(self) -> Dict[str, List[str]]:
        return urllib.parse.parse_qs(self.query_string, keep_blank_values=True)

    def query(self, name: str, default: str = "") -> str:
        values = self.query_params.get(name, [])
        return values[0] if values else default

    async def body(self) -> bytes:
        if self._body is None:
            chunks = []
            while True:
                event = await self._receive()
                if event["type"] == "http.request":
                    chunks.append(event.get("body", b""))
                    if not event.get("more_body", False):
                        break
                elif event["type"] == "http.disconnect":
                    break
            self._body = b"".join(chunks)
        return self._body

    async def text(self) -> str:
        return (await self.body()).decode("utf-8")

    async def json(self) -> Any:
        return json.loads(await self.body())

    @property
    def app(self) -> "Bare":
        return self._scope["app"]

    def __repr__(self) -> str:
        return f"<Request {self.method} {self.path}>"


# ── Response ──────────────────────────────────────────────────────────────────

class Response:
    def __init__(
        self,
        body: Any = None,
        status: int = 200,
        headers: Optional[Dict[str, str]] = None,
        content_type: str = "text/plain; charset=utf-8",
    ):
        self.status = status
        self._headers = {"content-type": content_type}
        if headers:
            self._headers.update({k.lower(): v for k, v in headers.items()})

        if body is None:
            self._body = b""
        elif isinstance(body, bytes):
            self._body = body
        elif isinstance(body, str):
            self._body = body.encode("utf-8")
        else:
            self._body = str(body).encode("utf-8")

    def set_header(self, name: str, value: str) -> "Response":
        self._headers[name.lower()] = value
        return self

    def set_cookie(self, name: str, value: str, **attrs) -> "Response":
        cookie = f"{name}={value}"
        for attr, attr_val in attrs.items():
            cookie += f"; {attr.replace('_', '-')}={attr_val}"
        self._headers.setdefault("set-cookie", cookie)
        return self

    async def send(self, send: Callable) -> None:
        headers = list(self._headers.items())
        headers.append(("content-length", str(len(self._body))))

        raw_headers = [
            (k.encode("latin-1"), v.encode("latin-1"))
            for k, v in headers
        ]

        await send({
            "type": "http.response.start",
            "status": self.status,
            "headers": raw_headers,
        })
        await send({
            "type": "http.response.body",
            "body": self._body,
            "more_body": False,
        })


class JSONResponse(Response):
    def __init__(self, data: Any, status: int = 200, **kwargs):
        super().__init__(
            body=json.dumps(data, default=str),
            status=status,
            content_type="application/json",
            **kwargs,
        )


class HTMLResponse(Response):
    def __init__(self, html: str, status: int = 200, **kwargs):
        super().__init__(body=html, status=status,
                         content_type="text/html; charset=utf-8", **kwargs)


class RedirectResponse(Response):
    def __init__(self, location: str, status: int = 302):
        super().__init__(status=status, headers={"location": location})


# ── Routing ───────────────────────────────────────────────────────────────────

@dataclass
class Route:
    method: str
    pattern: re.Pattern
    handler: Callable
    param_names: List[str]


def compile_route(path: str) -> Tuple[re.Pattern, List[str]]:
    """Convert '/users/{id:int}' to a compiled regex and param names."""
    converters = {"str": r"[^/]+", "int": r"[0-9]+", "slug": r"[a-zA-Z0-9-]+"}
    param_names = []

    def replace(m):
        name, _, conv = m.group(1).partition(":")
        param_names.append(name)
        return f"(?P<{name}>{converters.get(conv or 'str', converters['str'])})"

    regex = "^" + re.sub(r"\{([^}]+)\}", replace, path) + "$"
    return re.compile(regex), param_names


# ── WebSocket ─────────────────────────────────────────────────────────────────

class WebSocket:
    def __init__(self, scope: dict, receive: Callable, send: Callable):
        self._scope = scope
        self._receive = receive
        self._send = send
        self.path_params: Dict[str, str] = scope.get("path_params", {})

    async def accept(self, subprotocol: str = None) -> None:
        event = await self._receive()
        assert event["type"] == "websocket.connect"
        await self._send({
            "type": "websocket.accept",
            "subprotocol": subprotocol,
        })

    async def receive_text(self) -> Optional[str]:
        event = await self._receive()
        if event["type"] == "websocket.receive":
            return event.get("text")
        return None  # disconnect

    async def receive_bytes(self) -> Optional[bytes]:
        event = await self._receive()
        if event["type"] == "websocket.receive":
            return event.get("bytes")
        return None

    async def receive_json(self) -> Any:
        text = await self.receive_text()
        return json.loads(text) if text is not None else None

    async def send_text(self, text: str) -> None:
        await self._send({"type": "websocket.send", "text": text})

    async def send_bytes(self, data: bytes) -> None:
        await self._send({"type": "websocket.send", "bytes": data})

    async def send_json(self, data: Any) -> None:
        await self.send_text(json.dumps(data, default=str))

    async def close(self, code: int = 1000) -> None:
        await self._send({"type": "websocket.close", "code": code})

    async def __aenter__(self) -> "WebSocket":
        await self.accept()
        return self

    async def __aexit__(self, *exc) -> None:
        await self.close()


# ── The Framework ─────────────────────────────────────────────────────────────

class Bare:
    def __init__(self):
        self._http_routes: List[Route] = []
        self._ws_routes: List[Route] = []
        self._startup_handlers: List[Callable] = []
        self._shutdown_handlers: List[Callable] = []
        self._middleware: List[Callable] = []
        self._built_app: Optional[ASGIApp] = None
        self.state: Dict[str, Any] = {}

    # ── Route registration ────────────────────────────────────────────────

    def route(self, path: str, methods: List[str] = None):
        """Decorator to register an HTTP route handler."""
        methods = [m.upper() for m in (methods or ["GET"])]

        def decorator(func: Callable) -> Callable:
            pattern, param_names = compile_route(path)
            for method in methods:
                self._http_routes.append(
                    Route(method, pattern, func, param_names)
                )
            return func
        return decorator

    def get(self, path: str):
        return self.route(path, ["GET"])

    def post(self, path: str):
        return self.route(path, ["POST"])

    def put(self, path: str):
        return self.route(path, ["PUT"])

    def patch(self, path: str):
        return self.route(path, ["PATCH"])

    def delete(self, path: str):
        return self.route(path, ["DELETE"])

    def websocket(self, path: str):
        """Decorator to register a WebSocket handler."""
        def decorator(func: Callable) -> Callable:
            pattern, param_names = compile_route(path)
            self._ws_routes.append(Route("WS", pattern, func, param_names))
            return func
        return decorator

    # ── Lifespan ──────────────────────────────────────────────────────────

    def on_startup(self, func: Callable) -> Callable:
        self._startup_handlers.append(func)
        return func

    def on_shutdown(self, func: Callable) -> Callable:
        self._shutdown_handlers.append(func)
        return func

    # ── Middleware ────────────────────────────────────────────────────────

    def add_middleware(self, middleware_class, **kwargs):
        self._middleware.append((middleware_class, kwargs))
        self._built_app = None  # Invalidate cache

    # ── ASGI interface ────────────────────────────────────────────────────

    async def __call__(self, scope, receive, send):
        if self._built_app is None:
            self._built_app = self._build_app()
        await self._built_app(scope, receive, send)

    def _build_app(self) -> ASGIApp:
        app = self._handle
        for mw_class, kwargs in reversed(self._middleware):
            app = mw_class(app, **kwargs)
        return app

    async def _handle(self, scope, receive, send):
        scope["app"] = self

        if scope["type"] == "lifespan":
            await self._handle_lifespan(receive, send)
        elif scope["type"] == "http":
            await self._handle_http(scope, receive, send)
        elif scope["type"] == "websocket":
            await self._handle_websocket(scope, receive, send)

    async def _handle_lifespan(self, receive, send):
        while True:
            event = await receive()
            if event["type"] == "lifespan.startup":
                try:
                    for handler in self._startup_handlers:
                        await handler() if asyncio.iscoroutinefunction(handler) else handler()
                    await send({"type": "lifespan.startup.complete"})
                except Exception as e:
                    await send({"type": "lifespan.startup.failed", "message": str(e)})
                    return
            elif event["type"] == "lifespan.shutdown":
                for handler in self._shutdown_handlers:
                    try:
                        await handler() if asyncio.iscoroutinefunction(handler) else handler()
                    except Exception:
                        pass
                await send({"type": "lifespan.shutdown.complete"})
                return

    async def _handle_http(self, scope, receive, send):
        method = scope["method"]
        path = scope["path"]

        matched = []
        for route in self._http_routes:
            m = route.pattern.match(path)
            if m:
                matched.append((route, m))

        if not matched:
            await JSONResponse({"error": "not found"}, 404).send(send)
            return

        for route, m in matched:
            if route.method == method:
                scope["path_params"] = m.groupdict()
                request = Request(scope, receive)
                try:
                    response = await route.handler(request)
                    if response is None:
                        response = Response()
                    if not isinstance(response, Response):
                        response = JSONResponse(response)
                    await response.send(send)
                except Exception as e:
                    await JSONResponse({"error": str(e)}, 500).send(send)
                return

        allowed = sorted({r.method for r, _ in matched})
        await JSONResponse(
            {"error": "method not allowed", "allowed": allowed},
            405,
            headers={"allow": ", ".join(allowed)},
        ).send(send)

    async def _handle_websocket(self, scope, receive, send):
        path = scope["path"]
        for route in self._ws_routes:
            m = route.pattern.match(path)
            if m:
                scope["path_params"] = m.groupdict()
                ws = WebSocket(scope, receive, send)
                try:
                    await route.handler(ws)
                except Exception:
                    await send({"type": "websocket.close", "code": 1011})
                return

        # No matching WebSocket route — reject
        event = await receive()
        if event["type"] == "websocket.connect":
            await send({"type": "websocket.close", "code": 4004})

    def run(self, host: str = "127.0.0.1", port: int = 8000, **kwargs):
        """Convenience method to run with uvicorn."""
        import uvicorn
        uvicorn.run(self, host=host, port=port, **kwargs)

Using the Framework

# example_app.py
from bare import Bare, JSONResponse, HTMLResponse, WebSocket

app = Bare()

# In-memory store
tasks = {}


@app.on_startup
async def startup():
    print("App started. Ready to serve.")
    # Connect to database, load config, etc.


@app.on_shutdown
async def shutdown():
    print("App stopping. Cleaning up.")


@app.get("/")
async def index(request):
    return HTMLResponse("<h1>Bare Framework</h1><p>It's just callables.</p>")


@app.get("/tasks")
async def list_tasks(request):
    done = request.query("done")
    result = list(tasks.values())
    if done == "true":
        result = [t for t in result if t["done"]]
    elif done == "false":
        result = [t for t in result if not t["done"]]
    return JSONResponse(result)


@app.post("/tasks")
async def create_task(request):
    data = await request.json()
    if "title" not in data:
        return JSONResponse({"error": "title is required"}, 400)
    task = {
        "id": str(__import__("uuid").uuid4()),
        "title": data["title"],
        "done": False,
    }
    tasks[task["id"]] = task
    return JSONResponse(task, 201)


@app.get("/tasks/{task_id}")
async def get_task(request):
    task_id = request.path_params["task_id"]
    task = tasks.get(task_id)
    if not task:
        return JSONResponse({"error": "not found"}, 404)
    return JSONResponse(task)


@app.patch("/tasks/{task_id}")
async def update_task(request):
    task_id = request.path_params["task_id"]
    task = tasks.get(task_id)
    if not task:
        return JSONResponse({"error": "not found"}, 404)
    data = await request.json()
    if "done" in data:
        task["done"] = bool(data["done"])
    if "title" in data:
        task["title"] = str(data["title"])
    return JSONResponse(task)


@app.delete("/tasks/{task_id}")
async def delete_task(request):
    task_id = request.path_params["task_id"]
    if task_id not in tasks:
        return JSONResponse({"error": "not found"}, 404)
    return JSONResponse(tasks.pop(task_id))


@app.websocket("/ws")
async def chat(ws: WebSocket):
    async with ws:  # accept on enter, close on exit
        await ws.send_text("Welcome! Type messages to see them echoed.")
        while True:
            message = await ws.receive_text()
            if message is None:  # disconnect
                break
            await ws.send_json({
                "type": "echo",
                "original": message,
                "upper": message.upper(),
            })


if __name__ == "__main__":
    app.run(reload=True)
python example_app.py

# Test it
curl http://localhost:8000/
curl -X POST http://localhost:8000/tasks \
  -H "Content-Type: application/json" \
  -d '{"title": "Built with bare"}'

Adding Middleware to Our Framework

import time
import sys


class LoggingMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        started = time.monotonic()
        status_code = [None]

        async def capturing_send(event):
            if event["type"] == "http.response.start":
                status_code[0] = event["status"]
            await send(event)

        await self.app(scope, receive, capturing_send)

        elapsed = (time.monotonic() - started) * 1000
        print(
            f"{scope['method']} {scope['path']} → "
            f"{status_code[0]} ({elapsed:.1f}ms)",
            file=sys.stderr,
        )


app.add_middleware(LoggingMiddleware)

Testing Our Framework

import asyncio
import json


async def test_bare_framework():
    from example_app import app

    # Build a minimal test harness
    async def request(method, path, body=b"", headers=None):
        scope = {
            "type": "http",
            "asgi": {"version": "3.0"},
            "method": method,
            "path": path,
            "query_string": b"",
            "headers": headers or (
                [(b"content-type", b"application/json")] if body else []
            ),
            "server": ("testserver", 8000),
        }
        events = [{"type": "http.request", "body": body, "more_body": False}]
        idx = [0]

        async def receive():
            e = events[idx[0]] if idx[0] < len(events) else {"type": "http.disconnect"}
            idx[0] += 1
            return e

        response_events = []
        async def send(event):
            response_events.append(event)

        await app(scope, receive, send)
        start = next(e for e in response_events if e["type"] == "http.response.start")
        body_data = b"".join(
            e.get("body", b"") for e in response_events
            if e["type"] == "http.response.body"
        )
        return start["status"], json.loads(body_data) if body_data else None

    # Test cases
    status, data = await request("GET", "/tasks")
    assert status == 200, f"Expected 200, got {status}"

    status, data = await request(
        "POST", "/tasks",
        body=json.dumps({"title": "Framework test"}).encode()
    )
    assert status == 201
    task_id = data["id"]

    status, data = await request("GET", f"/tasks/{task_id}")
    assert status == 200
    assert data["title"] == "Framework test"

    status, data = await request("GET", "/tasks/nonexistent")
    assert status == 404

    print("Framework tests passed.")


asyncio.run(test_bare_framework())

What We Left Out

bare is small by design. Things a production framework would add:

Static file serving — map a URL prefix to a directory of files. Not hard, but involves MIME type detection and If-Modified-Since handling.

Template rendering — integrate with Jinja2 or similar. The Response class would gain a from_template(name, context) factory.

Form parsingmultipart/form-data for file uploads. The spec is in RFC 7578 and it’s tedious.

Cookie handling — parsing the Cookie header and setting Set-Cookie on responses.

Session middleware — signed cookies or server-side sessions backed by Redis.

Error pages — catch exceptions in handlers and return friendly 500 pages rather than bare JSON.

OpenAPI generation — FastAPI’s big feature: inspect route handlers’ type annotations and build an OpenAPI schema automatically.

Each of these is a well-understood problem with known solutions. The framework doesn’t do anything you couldn’t do yourself — it just packages the common solutions conveniently.

The Point

You’ve now built a framework. It’s small, but it’s real — the tasks app runs on it, the WebSocket handler works, middleware composes correctly. You understand every line of it because you wrote every line of it.

When you use FastAPI or Starlette next week, you’ll recognize the patterns: the route decorators are building a route table. The Request object is wrapping the scope. The Response is sending events to send. The lifespan context manager is handling startup/shutdown events.

The framework isn’t doing anything mysterious. It’s doing exactly what you’d do if you wrote it yourself — which you just did.

You Know Too Much Now (What to Do With It)

You started this book believing — or at least accepting on faith — that web frameworks were doing something fundamentally complex. Something that required expertise and abstraction to approach safely. Something you didn’t need to understand to use.

That belief, it turns out, was based on a lie of omission. Not a malicious one. Frameworks genuinely are complex in the engineering sense — they handle thousands of edge cases, they’re battle-tested against adversarial input, they’ve accumulated years of hard-won knowledge about what goes wrong in production. None of that complexity is wasted.

But the interface they expose to the world is just a callable. A function. The whole thing.

What You’ve Built

Over the course of this book, you built:

  • A raw HTTP parser that turns TCP bytes into structured request data
  • A WSGI server that accepts connections, builds environ, and calls applications
  • WSGI middleware for logging, authentication, CORS, timing, and request IDs
  • A URL router with path parameter extraction using compiled regular expressions
  • Request and Response classes that wrap the WSGI interface with a clean API
  • An ASGI server using asyncio.start_server with proper event-based flow
  • ASGI middleware with request/response interception and lifespan support
  • A WebSocket chat server using the full ASGI WebSocket protocol
  • A test harness for both WSGI and ASGI applications
  • A complete mini-framework with routing, lifespan, WebSockets, and middleware

Each of these was built from first principles, consulting the spec rather than copying from an existing library. None of them are production-ready in the sense that Gunicorn or Uvicorn is production-ready — that would take more than a book — but all of them are correct, which is what matters for understanding.

What This Changes

The practical impact of understanding your tools at this level is subtle but real.

Debugging gets easier. When Django throws an error in WSGIHandler.__call__, you know what that is. When Uvicorn logs a warning about a malformed request, you understand the parsing step it’s complaining about. When FastAPI returns a 422 on a request you think is valid, you can trace it through the parameter extraction code you now understand.

Performance becomes legible. WSGI workers handle one request at a time per thread — you know why, because you built a synchronous server. ASGI handles many requests per event loop iteration — you know why, because you built an async server. When someone says “add more Gunicorn workers,” you know what that means at the socket level.

Middleware composition is obvious. You’ll never again be confused about the order your middleware runs in, because you’ve implemented build_middleware_stack yourself and seen how reversed(middleware) produces the right wrapping order.

Testing is straightforward. Your app is a callable. Call it with a fake environ or scope. Inspect the result. This is all your test client is doing, and now you know it.

Choosing between WSGI and ASGI is rational. You know what WSGI can’t do (hold connections open, do async I/O efficiently) and why ASGI exists to address those limitations. The choice isn’t “what’s the new thing” — it’s a decision based on your actual requirements.

What You Should Still Use Frameworks For

Knowing how something works doesn’t mean you should build it yourself for every project. The wheel is understood; you still don’t build a new one for every car.

Use Django when you want the batteries: ORM, admin, migrations, auth. It’s a well-engineered solution to a set of common problems, and “well-engineered” includes fifteen years of security patches.

Use FastAPI when you want async performance, type-annotated APIs, and automatic OpenAPI docs with minimal boilerplate. The type coercion and documentation generation are genuinely valuable and tedious to implement correctly.

Use Flask when you want something small and explicit, where you add only what you need.

Use the mini-framework from the last chapter when you want something you understand completely and can modify freely — for personal projects, for microservices with unusual requirements, for fun.

The right answer depends on context. Now you have enough context to make the decision rationally.

What the Spec Documents Are For

You now have a reason to read them:

  • PEP 3333 (WSGI): python.org/dev/peps/pep-3333 — the canonical reference for everything environ, start_response, and the response iterable contract
  • ASGI Spec: asgi.readthedocs.io — scope types, event names, and the lifespan protocol
  • RFC 9110 (HTTP Semantics): the definitive reference for HTTP methods, status codes, and header semantics
  • RFC 6455 (WebSocket): the WebSocket protocol spec, including the handshake, frame format, and close codes

These aren’t bedtime reading. They’re reference documents. Now that you have a mental model of what they’re specifying, they become useful rather than impenetrable.

The Deeper Point

There’s a broader principle at work here beyond Python web development.

Most of the complexity in software is accidental — it comes from accumulated decisions, backward compatibility constraints, and the need to handle edge cases that most applications never encounter. The essential complexity (the hard part that can’t be simplified away) is usually much smaller.

The essential complexity of a web server is: read bytes, parse HTTP, call a callable, write bytes. Everything else is handling the cases where that simple description fails.

When you encounter a system that seems impossibly complex — a message broker, a container runtime, a database engine — the same approach applies: find the interface, build something that implements it, and watch the complexity become accidental rather than essential.

This book was about Python web protocols. The method is general.

A Note on bare

The mini-framework we built in the last chapter is a teaching tool, not a production framework. If you found the exercise genuinely useful and want to continue building, consider:

  • Adding proper error handling with custom exception classes
  • Implementing dependency injection (FastAPI’s killer feature is more tractable than it looks)
  • Adding background task scheduling
  • Implementing WebSocket rooms properly with asyncio queues
  • Writing comprehensive tests for the framework itself

Or, more likely, go back to FastAPI or Starlette with a much better understanding of what they’re doing and why.

Thank You

This book exists because Georgiy Treyvus asked the right question at the right time. Building software is easier when someone asks “but what’s actually happening?”

Go build something. You know too much to be mystified by it now.


Back to Bare Metal: WSGI & ASGI for Python Developers Published by CloudStreet — github.com/cloudstreet-dev CC0 1.0 Universal — no rights reserved