[Rust] Tokio stack overview: Runtime

TL:DR: In this first installment of a series devised to give an overview of the Tokio stack, I talk about its runtime.

Quoting its first announcement, “Tokio is a platform for writing fast networking code in Rust [and] is primarily intended as a fou…


This content originally appeared on DEV Community and was authored by DEV Community

TL:DR: In this first installment of a series devised to give an overview of the Tokio stack, I talk about its runtime.

Quoting its first announcement, "Tokio is a platform for writing fast networking code in Rust [and] is primarily intended as a foundation for other libraries".

Tokio Stack

In this series, I plan to go through all the libraries (crates) that belong to this stack, explaining the very basics of each, which means trying to answer these simple questions: what problems do them solve and how?

Today, I start with the first (and, I assume, the most known) of them: the Tokio runtime.

What is an asynchronous runtime?

Core Rust provides the types to build an asynchronous application. However, when building, say, an asynchronous network application, we found ourselves in the need of a lot of boilerplate code. We can write it ourselves, or we can use a library that gives it to us, ready-made (and probably better-made). And that is what an asynchronous runtime such as Tokio does, it provides the building blocks to construe such an application.

Futures

Let us start by taking a look at what core Rust brings to the table, so we can better understand what we would lack if we were to build an asynchronous network application all by ourselves.

P.S. I already wrote an introduction to async Rust, so this will be a dried out explanation.

Asynchronous Rust allows us to create concurrent applications. It does so via the syntax async/.await. Basically, blocks and functions declared with async desugar into a block or function that returns the implementation of a trait called Future. Future is a state machine, so it can keep up with the progress made in a certain operation, which means it can stop processing at some point and, when executing again, continue from where it stopped. On a higher level, we might say that a future is the representation of a value that may or may not be ready, a duality that is put forward using an enum called Poll that has two variants: Pending and Ready<T>. The Future trait also has a function, called poll(), that will try to make as much progress as possible within the future (thus driving the state machine forward). This function, poll(), is first executed when we .await the future. To .await the future is to deliver it to a scheduler (formerly known as executor) that will poll() it. If it is processed through completion, Ready<T> is returned, otherwise Pending is returned and the scheduler keeps the future aside, waiting for a request to poll() it again. This request comes from the driver (formerly known as reactor), which is an I/O event loop.

Rust does not provide these last two. That is why we need a crate that provides them. Besides that, we also need some time-related utilities to handle all this scheduling stuff.

Needless to say, that this is precisely what the Tokio runtime provides. Quoting its documentation:

Unlike other Rust programs, asynchronous applications require runtime support. In particular, the following runtime services are necessary:

  • An I/O event loop, called the driver, which drives I/O resources and dispatches I/O events to tasks that depend on them.
  • A scheduler to execute tasks that use these I/O resources.
  • A timer for scheduling work to run after a set period of time.

Scheduler

When you code an async function for the first time, you realize that the place from which you are calling this function also has to be async. And if you go all the way up and try to make your main() function async, Rust will tell you that "main function is not allowed to be async".

Asking Rust to explain this error gives us a hint:

$ rustc --explain E0752

`fn main()` or the specified start function is not allowed to be `async`. Not having a correct async runtime library setup may cause this error.

A quick search on the web is enough to provide the solution: we got to import tokio and use this attribute macro:

#[tokio::main]
async fn main(){ 
    // ... 
}

However, even thought it certainly works, a question remains…

Why?

Because at some point the futures have to be dealt with, and there is nothing above the main() function in a Rust program, so whoever is handling them, have to be below it. Another way to put it is to say that the main() is the entry door of your program. The operating system running the binary knows nothing about futures, so they have to be managed "inside the house", that is, after we entered the program. So, main() has to be synchronous.

If that is the case, how does Tokio manage to make main() async, if the top-level function cannot be async? Well, it does not. #[tokio::main] will desugar async fn main() into something like this:

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            // ...
        })
}

When using the attribute macro #[tokio::main], we are building a runtime below main(), a runtime that will handle the tree of futures. Why am I calling it a tree? Because a future may .await other futures. I will talk more about multiple .await calls later. For now, let us move on with this idea of handling a tree of futures.

Handling the tree of futures

Consider the example below.

#[tokio::main]
async fn main() {
    let (foo, bar) = tokio::join! { foo(), bar() };
    println!("{}{}", foo, bar);
}

async fn foo() -> &'static str {
    let listener = std::net::TcpListener::bind("0.0.0.0:8080").unwrap();
    match listener.accept(){
        Ok(_pair) => {
            println!("`foo()` is finished");
            "foo"
        },
        Err(_error) => "error"
    }
}

async fn bar() -> &'static str {
    let listener = std::net::TcpListener::bind("0.0.0.0:8081").unwrap();
    match listener.accept(){
        Ok(_pair) => {
            println!("`bar()` is finished");
            "bar"
        },
        Err(_error) => "error"
    }
}

Tip: run the code above and connect to both 0.0.0.0:8080 and 0.0.0.0:8081 using your browser and check the result in the terminal where you ran the program.

When we call foo().await, we are handing foo()'s future to the runtime scheduler, the one responsible for calling poll() on it. Futures are executed by the scheduler as part of tasks. You might think of a task as a thread that is not handled by the OS scheduler, but by the runtime scheduler (they are virtual/green threads).

This will run foo() as far as possible towards completion, which means that the executor will not preemptively stop it to run something else in its stead (as the OS does with its threads). For the Tokio scheduler, as far as a task is doing relevant work, it may keep working. In a more technical jargon, tasks run until they yield. In our example, foo() runs until it starts listening at port 8080. If you're trying to understand which part of our code is explicitly yielding the task, give up. It is not there. We don't code yields, Rust manages that for us.

After foo() yields, join! will call bar(), which will run until it starts listening at port 8081. At this point, as both functions have yielded, we have two futures waiting to be polled again, and they may be polled in any order. Now, imagine that foo() and/or bar() call async functions inside them, giving new tasks to the scheduler. In a scenario like this, we have a tree of futures. One important thing to understand here is that we have a "root-future" (the async book calls it “top-level future”, but I will stick with “root”); in this case, it is the future returned by that async block in main (you will find inside block_on() in the desugared example). And this is important for at least two reasons.

First, a task is responsible for a tree of futures. So, let's say we have an async fn main(). As we saw, under the hood this is a normal main() that will block_on() an async block. If inside this future we .await another future, it will be dealt by the same task, as it is part of the same tree.

Second, it points to the threshold between concurrency and parallelism. If you just .await or join! futures, you will never have two Tokio tasks running simultaneously because, at the end, our main() is .awaiting the root-future, and its node-futures are executed one after the other as part of the same task, hence in the same OS thread. In other words, your async program will have concurrency, but not parallelism.

Revisiting our example, even if ports 8080 and 8081 are accessed at the same time, foo() and bar() will be executed one after the other because they are fruits futures of the same tree. Sure, this is no big deal here, but if you remember that we are talking about network applications and, by doing so, extrapolate over this silly example, you will quickly see this cannot be right.

Scheduling parallel tasks

As mentioned above, main() is the entry point of our program, so everything we are doing is below it. And what we have below (what #[tokio::main] desugars to) is a runtime that was built using new_multi_thread(): a multi-threaded Tokio runtime. So far, we have been using only one of those threads; it is running our task spawned by block_on(). If we want parallelism, we need to hand our futures to the runtime itself, so they can become a "root-future" and, as such, become new tasks. To achieve this parallelism, that is, to allow the runtime to execute our tasks with a different worker of its thread pool, we got to spawn the tasks.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let db: Arc<Mutex<HashMap<&str, &str>>> = Default::default();
    tokio::spawn(foo(db.clone()));
    tokio::spawn(bar(db.clone()));
    handle(db).await;
}

async fn foo(db: Arc<Mutex<HashMap<&str, &str>>>) {
    let listener = std::net::TcpListener::bind("0.0.0.0:8080").unwrap();
    match listener.accept(){
        Ok(_pair) => {
            loop {
                if let Ok(mut lock) = db.try_lock(){
                    println!("`foo()` is finished");
                    lock.insert("f", "foo");
                    break;
                }
            }
        },
        Err(_error) => println!("error"),
    }
}

async fn bar(db: Arc<Mutex<HashMap<&str, &str>>>) {
    let listener = std::net::TcpListener::bind("0.0.0.0:8081").unwrap();
    match listener.accept(){
        Ok(_pair) => {
            loop {
                if let Ok(mut lock) = db.try_lock(){
                    println!("`bar()` is finished");
                    lock.insert("b", "bar");
                    break;
                }
            }
        },
        Err(_error) => println!("error"),
    }
}

async fn handle(db: Arc<Mutex<HashMap<&str, &str>>>) {
    loop {
        if let Ok(lock) = db.try_lock(){
            if lock.len() == 2 {
                println!("{}{}", lock.get("f").unwrap(), lock.get("b").unwrap());
                break;
            }
        }
    }
}

In the example above, foo() and bar() become root-futures in their own right, and as handle() is the single future within the block_on() future, we end up with three different trees of futures. That way, if we call all three functions at the “same” time, they can be executed in three different threads (assuming the runtime has these threads).

I might have went a little over the top by using Arc<Mutex<HashMap>>, since it could be dealt with in an easier manner with JoinHandle. My reasoning was that using the smart pointers made it easier to see the parallelism, as the JoinHandle looks very similar to how we use .await.

Going beyond

If you want to go above and beyond, a good place to start is to understand how Tokio employs a work-stealing technique to manage its multithreaded scheduler.

Driver

Let us reconsider our previous example. After foo() and bar() both yield, which happens once they start listening at 0.0.0.0, they return Poll::Pending. As there is still work to be done, the scheduler will not get rid of them, but will not poll() them again autonomously; it will poll() them again under request.

In this case, the source of the need to poll() them again is the access to 0.0.0.0. However, if neither foo() nor bar() are actually running, which process will pull the trigger? Something has to be running to mediate our access to 0.0.0.0 and the scheduler. That is the role of the driver, which is how Tokio call its I/O event loop.

Before diving into the driver, though, let us talk a bit more about what make it necessary: a pending future.

Pending future

Maybe this topic belongs to the scheduler, but as it is vital for an understanding of the driver, I think it also fits here.

When we poll a future, it receives a Context as an argument. Currently, this Context is just a wrapper for the &Waker. This &Waker is a reference to the Waker found within the task that calls poll(). This &Waker has a method wake() that is called by the driver, so the task (that owns Waker) becomes aware that it should poll() the future once again.

The following flow is an illustrative example of how it works:

  1. A future is .awaited.
  2. As such, it is handed to the scheduler task that was created by block_on() (as we saw, spawn also creates tasks).
  3. This task will poll() the future, which will do some work until it reaches the point where it has to yield; let's say it is listening at some address, as our foo() was.
  4. Before yielding, the future clone() the &Waker received as an argument in poll(). That “binds” the future and the task.
  5. It yields, returning Poll::Pending.
  6. When the operating system's I/O receives a connection on that certain address, it will let the driver know.
  7. The driver will call wake() on the &Waker stored by the future, and this will wake up the task.
  8. The awoken task will then poll() the future again. If it returns Pending again, the new Waker the was passed by this last poll() will be copied and the process restarts.

The 6th and 7th steps describes the role of the driver as an interface between the OS and the task scheduler. This means that the driver will perform system calls to the OS, such as kqueue in BSD/macOS, IPCP in Windows or epoll in Linux (and now we are hearing more and more about io_uring, which Tokio handles as well).

It is worth noting that the interaction is actually between the driver and mio, so it is mio who interacts with the OS. That being said, I will abstract from it here, so we can depict a simplified conversation between Tokio's driver and the OS I/O, which comprises the 6th step above.

The driver, being an event loop, will keep polling the OS using one of these system calls. Let us retrieve our foo() example. If the driver polls the OS and find out that there was a connection at 0.0.0.0:8080, it will then wake() the task for it to poll() the future.

Sure, there is a myriad of details left out. For example, how the communication via channels between the scheduler and the driver actually works? Nevertheless, I will respect the beginners tag with which I marked this post and stop here. (Even because, if I write posts for beginners, it is not only because I think we still miss more introductory content, but also because of my own current limitations; and here we are teetering on the edge of my knowledge gap ?).

Timer

The module tokio::time is part of the runtime and provides utilities for tracking time. I don't have much to talk about these, but I will, for the sake of completion, quote the part of the documentation that explains what this module provides:

  • Sleep is a future that does no work and completes at a specific Instant in time.
  • Interval is a stream yielding a value at a fixed period. It is initialized with a Duration and repeatedly yields each time the duration elapses.
  • Timeout: Wraps a future or stream, setting an upper bound to the amount of time it is allowed to execute. If the future or stream does not complete in time, then it is canceled and an error is returned.

I will stop here for today. I feel there is a lot missing, but this post is already longer than I wanted. Hopefully, we will be able to revisit some topics as we move on to talk about the other crates.

As always, if you spot something

See you there!

Cover photo by Pawel Nolbert


This content originally appeared on DEV Community and was authored by DEV Community


Print Share Comment Cite Upload Translate Updates
APA

DEV Community | Sciencx (2021-09-12T13:39:39+00:00) [Rust] Tokio stack overview: Runtime. Retrieved from https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/

MLA
" » [Rust] Tokio stack overview: Runtime." DEV Community | Sciencx - Sunday September 12, 2021, https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/
HARVARD
DEV Community | Sciencx Sunday September 12, 2021 » [Rust] Tokio stack overview: Runtime., viewed ,<https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/>
VANCOUVER
DEV Community | Sciencx - » [Rust] Tokio stack overview: Runtime. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/
CHICAGO
" » [Rust] Tokio stack overview: Runtime." DEV Community | Sciencx - Accessed . https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/
IEEE
" » [Rust] Tokio stack overview: Runtime." DEV Community | Sciencx [Online]. Available: https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/. [Accessed: ]
rf:citation
» [Rust] Tokio stack overview: Runtime | DEV Community | Sciencx | https://www.scien.cx/2021/09/12/rust-tokio-stack-overview-runtime/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.