ARTICLE

A First asyncio Application

From Python Concurrency with asyncio by Matthew Fowler

Manning Publications

--

This article shows how you might make your first application that leverages asyncio.

Take 40% off Python Concurrency with asyncio by entering fccfowler into the discount code box at checkout at manning.com.

An echo server on the asyncio event loop

Working with select is a bit too low level for most applications. We may want to have code run in the background as we wait for socket data to come in or we may want to have background tasks run on a schedule. If we were to do this with only selectors, we’d wind up building our own event loop when asyncio has a nicely implemented one ready for us to use. In addition, coroutines and tasks provide abstractions on top of selectors, which make our code easier to implement and maintain as we don’t need to think about selectors at all.

Now that we have a deeper understanding on how the asyncio event loop works, let’s take the echo server that we built in the last section and build it again using coroutines and tasks. We’ll still use lower-level sockets to accomplish this, but we’ll use asyncio based APIs that return coroutines to manage them. We’ll also add some more functionality to our echo server to demonstrate a few key concepts of how asyncio works.

Event loop coroutines for sockets

Given sockets are a relatively low-level concept, the methods for dealing with them are on asyncio’s event loop. We’ll want to work with three main coroutines, sock_accept, sock_recv and sock_sendall. These are analogous to the methods on socket which we were using earlier, except they take in a socket as an argument and return coroutines that we can await until we have data to act on.

Let’s start with sock_accept, this coroutine is analogous to the socket.accept method that we saw in our first implementation. This method returns a tuple of a socket connection and a client address. We pass it in the socket we’re interested in and we can then await the coroutine it returns. Once that coroutine completes, we’ll have our connection and address. This socket must be non-blocking and should already be bound to a port.

connection, address = await loop.sock_accept(socket)

sock_recv and sock_sendall are called similarly to sock_accept. They take in a socket and we can then await for a result. sock_recv waits until a socket has bytes we can process. sock_sendall takes in both a socket and the data we want to send and wait until all data we want to send to a socket has been sent and return None on success.

data = await loop.sock_recv(socket)
success = await loop.sock_sendall(socket, data)

With these building blocks, we’ll be able to translate our previous approaches into one using coroutines and tasks.

Designing an asyncio echo server

When should we use only a coroutine and when should we wrap a coroutine in a task for our echo server? Let’s examine how we want our application to behave to make this determination.

We’ll start with how we want to listen for connections in our application. When we listen for connections, we can only process one connection at a time as socket.accept only gives us one client connection. Behind the scenes incoming connections are stored in a queue known as the backlog if we get multiple at the same time, but for this article we won’t get into how this works. Because we don’t need to process multiple connections concurrently, a single coroutine that loops forever makes sense. This allows other code to run concurrently as we pause waiting for a connection. We’ll define a coroutine called listen_for_connections that loops forever and listens for any incoming connections.

async def listen_for_connections(server_socket: socket,
loop: AbstractEventLoop):
while True:
connection, address = await loop.sock_accept(server_socket)
connection.setblocking(False)
print(f"Got a connection from {address}")

Now that we have a coroutine for listening to connections, how about reading and writing data to the clients who are connected? Should that be a coroutine or a coroutine we wrap in task? In this case, we have multiple connections, each of which could send data to us at any time. We don’t want waiting for data from one connection to block another and we need to concurrently read and write data from multiple clients. Because we need to handle multiple connections at the same time, creating a task for each connection to read and write data makes sense. On every connection we get, we create a task to both read data from and write data to that connection.

We’ll create a coroutine named echo which is responsible for handling data from a connection. This coroutine loops forever listening for data from our client. Once it receives data it then sends it back to the client.

Then in listen_for_connections we’ll create a new task that wraps our echo coroutine for each connection that we get. With these two coroutines defined, we now have all we need to build an asyncio echo server.

Listing 1 an asyncio echo server

import asyncio
import socket
from asyncio import AbstractEventLoop


async def echo(connection: socket,
loop: AbstractEventLoop) -> None:
while data := await loop.sock_recv(connection, 1024): #A
await loop.sock_sendall(connection, data) #B


async def listen_for_connection(server_socket: socket,
loop: AbstractEventLoop):
while True:
connection, address = await loop.sock_accept(server_socket)
connection.setblocking(False)
print(f"Got a connection from {address}")
asyncio.create_task(echo(connection, loop)) #C


async def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', 8000)
server_socket.setblocking(False)
server_socket.bind(server_address)
server_socket.listen()

await listen_for_connection(server_socket, asyncio.get_event_loop()) #D

asyncio.run(main())

#A Loop forever waiting for data from a client connection

#B Once we have data, send it back to that client

#C Whenever we get a connection, create an echo task to listen for client data.

#D Start the coroutine to listen for connections

The architecture for listing 1 looks like the following, we have one coroutine, listen_for_connection, listening for connections. Once a client connects our coroutine spawns an echo task for each client which then listens for data and writes it back out to the client.

The coroutine listening for connections spawns one task per each connection it gets.

When we run this application, we’ll be able to connect multiple clients concurrently and send data to them concurrently. Under the hood, this uses selectors and our CPU utilization remains low.

We’ve now built a fully functioning echo server entirely using asyncio! Is our implementation error free? It turns out the way we designed this results in an issue when our echo task fails which we need to handle.

Handling errors in tasks

Network connections are often unreliable, and we may get exceptions we don’t expect in our application code. How does our application behave if reading or writing to a client fails and throws an exception? To test this out, let’s change our implementation of echo to throw an exception when a client passes us a specific keyword.

async def echo(connection: socket,
loop: AbstractEventLoop) -> None:
while data := await loop.sock_recv(connection, 1024):
if data == b'boom\r\n':
raise Exception("Unexpected network error")
await loop.sock_sendall(connection, data)

Now, whenever a client sends “boom” to us, we raise an exception and our task crashes. What happens when we connect a client to our server and send this message? We see a traceback with a warning like this:

Task exception was never retrieved
future: <Task finished name='Task-2' coro=<echo() done, defined at asyncio_echo.py:5> exception=Exception('Unexpected network error')>
Traceback (most recent call last):
File "asyncio_echo.py", line 9, in echo
raise Exception("Unexpected network error")
Exception: Unexpected network error

The important part here is Task exception was never retrieved. What does this mean? When an exception is thrown inside a task, the task is considered done with its result as an exception. This means that no exception is thrown up the call stack. Furthermore, we have no cleanup here. If this exception is thrown, we can’t react to the task failing because we never retrieved the exception.

In order to have the exception bubble up, we must use the task in an await expression. When we await a task that failed, the exception gets thrown where we perform await and the traceback reflects that. If we don’t await a task at some point in our application, we run the risk of never seeing an exception raised by a task. Although we saw the exception output in the example, which may lead us to think it isn’t that big of a deal, there are subtle ways we could change our application to never see this message.

As a demonstration of this, let’s say instead of ignoring the echo tasks we create in listen_for_connections, we kept track of them in a list like this:

tasks = []

async def listen_for_connection(server_socket: socket,
loop: AbstractEventLoop):
while True:
connection, address = await loop.sock_accept(server_socket)
connection.setblocking(False)
print(f"Got a connection from {address}")
tasks.append(asyncio.create_task(echo(connection, loop)))

One expects this to behave in the same way as before. If we send the boom message, we’ll see the exception printed along with the warning that we never retrieved the task exception, but this isn’t the case; we’ll see nothing printed until we forcefully terminate our application!

This is because we’ve kept a reference around to the task. Asyncio can only print this message and the traceback for a failed task when that task is garbage collected. This is because it has no way to tell if that task awaits at some other point in the application and therefore raises an exception. Due to these complexities we either need to await our tasks at some point or handle all exceptions that our tasks could throw, but how do we do this in our echo server?

The first thing we can do to fix this is wrap the code in our echo coroutine in a try/catch statement, log the exception, and close the connection:

import logging

async def echo(connection: socket,
loop: AbstractEventLoop) -> None:
try:
while data := await loop.sock_recv(connection, 1024):
print('got data!')
if data == b'boom\r\n':
raise Exception("Unexpected network error")
await loop.sock_sendall(connection, data)
except Exception as ex:
logging.exception(ex)
finally:
connection.close()

This resolve the immediate issue of an exception causing our server to complain that a task exception was never retrieved because we handle it in the coroutine itself. It also properly shuts down the socket within the finally block, and we won’t be left with a dangling unclosed exception in the event of a failure.

It’s important to note that this implementation properly closes any connections to clients we have open on application shutdown. Why is this? asyncio.run cancels any tasks we have remaining when our application shuts down. When we cancel a task, a CancelledException is raised whenever we try to await it. The important thing here is noting where that exception is raised. If our task is waiting on a statement such as await loop.sock_recv and we cancel that task, a CancelledException is thrown from the await loop.sock_recv line. This means that in the above case our final block is executed because we threw an exception on an await expression when we cancelled the task. If we change the exception block to catch and log these exceptions, you’ll see one CancelledException per each task that was created.

We’ve now handled the immediate issue of handling errors when our echo tasks fail. What if we want to provide some cleanup of any errors or leftover tasks when our application shuts down? We can do this with asynio’s signal handlers.

That’s all for this article.

If you want to learn more about the book, you can check it out on our browser-based liveBook platform here.

--

--

Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.