Premier exemple

Mise en situation

Supposons qu’on veuille afficher des messages sur la console, mais en attendant un certain temps. Avec un message unique, c’est facile à faire de façon synchrone…

#![allow(unused)]
fn main() {
use std::time::Duration;

// Définition du message
const DELAI: Duration = Duration::from_millis(100);
const MESSAGE: &str = "Bonjour à tous";

// Attente
std::thread::sleep(DELAI);

// Affichage
println!("Message : {MESSAGE}");
}

…mais à partir du moment où on veut le faire avec plusieurs messages, il faut réfléchir un peu plus. Dans ce cas particulier, on peut s’en tirer en triant les messages par durée d’attente croissante, et en raisonnant en termes de deadline plutôt que de durée…

#![allow(unused)]
fn main() {
use std::time::{Duration, Instant};

// Définition des messages
let messages = [(Duration::from_millis(100), "...à tous"),
                (Duration::from_millis(50), "Bonjour...")];

// Transformation des délais en deadlines
let debut = Instant::now();
let mut messages = messages.map(|(duree, texte)| (debut + duree, texte));

// Tri par deadline croissante
messages.sort_by_key(|(deadline, _)| *deadline);

// Attentes et affichages
for (deadline, texte) in messages {
    std::thread::sleep(deadline.saturating_duration_since(Instant::now()));
    println!("Temps écoulé : {:?}", debut.elapsed());
    println!("Message : {texte}");
}
}

…mais ce n’est pas une approche générisable à toute tâche asynchrone : elle ne fonctionne que quand on connaît le délai d’attente à l’avance. Dans le cas général, si nous voulons gérer plusieurs tâches concurrentes, il nous faut un moyen de garder de côté l’état des différentes tâches en vol, et de basculer entre les différentes tâches au fur et à mesure que les différentes attentes se terminent.

Version avec des threads

On peut résoudre le problème de façon générale et relativement simple avec des threads

#![allow(unused)]
fn main() {
use std::io::Write;
use std::time::{Duration, Instant};

// Définition des messages
let messages = [(Duration::from_millis(100), "...à tous"),
                (Duration::from_millis(50), "Bonjour...")];

// Transformation des délais en deadlines
let debut = Instant::now();
let messages = messages.map(|(duree, texte)| (debut + duree, texte));

// Affichage temporisé d'un message
// ATTENTION : println!() ne synchroniserait pas à la bonne granularité
let afficher_message = move |(deadline, texte): (Instant, &str)| {
    std::thread::sleep(deadline.saturating_duration_since(Instant::now()));
    let mut stdout = std::io::stdout().lock();
    writeln!(&mut stdout, "Temps écoulé : {:?}", debut.elapsed())?;
    writeln!(&mut stdout, "Message : {texte}")
};

// Lancement des threads
std::thread::scope(|s| {
    for message in messages {
        s.spawn(move || afficher_message(message).expect("Echec de l'affichage"));
    }
});
}

…et pour un nombre faible de tâches concurrentes, c’est une bonne solution : c’est simple, ça fait le travail demandé, et le système d’exploitation se charge de tout le sale boulot de gérer les différentes tâches en vol, les mettre en attente quand il faut, et les réveiller quand l’attente est terminée.

Malheureusement, cette approche ne passe pas bien à l’échelle quand le nombre de threads augmente, pour plusieurs raisons :

  • D’abord, à chaque thread est associé à un état interne relativement imposant, incluant notamment l’intégralité des données stockées sur sa pile. Tout ça prend de la place en RAM, souvent bien plus de place que la tâche en cours n’en a vraiment besoin.
  • Ensuite, au niveau du système d’exploitation, ça peut être coûteux au niveau CPU de créer, gérer et détruire un grand nombre de threads. Certains systèmes d’exploitation comme Linux font leur possible pour rendre ces opérations aussi efficaces que possible, d’autres comme Windows… y dépensent moins d’énergie. Mais dans tous les cas, ça va finir par représenter un coût conséquent quand on a un grand nombre de tâches concurrentes à traiter, chacune ne faisant qu’un travail relativement simple.
  • Enfin, l’OS tente d’exécuter les threads en parallèle même quand ça n’a pas d’intérêt pour les performances, ce qui nous force à payer le coût en complexité et temps d’exécution d’une synchronisation entre threads (ici verouiller stdout à la bonne granularité) alors qu’on n’y gagnera rien au niveau des performances d’exécution.

Nous allons donc maintenant voir comment on peut gagner en efficacité en gérant la concurrence nous-même plutôt que de déléguer ce travail au mécanisme de threads de l’OS.

Première version asynchrone

Ajoutons maintenant la crate utilitaire futures et le runtime tokio comme dépendances…

cargo add futures
cargo add tokio --features full

…et modifions un peu notre programme pour utiliser des tâches asynchrones tokio à la place des threads du système d’exploitation :

// NOUVEAU : On active les extensions de Future fournies par la crate futures
use futures::prelude::*;
use std::time::Duration;
use tokio::{runtime::Runtime, time::Instant};

// NOUVEAU : On initialise et active le runtime tokio
let runtime = Runtime::new().expect("Echec d'initialisation du runtime");
let _garde = runtime.enter();

// Définition des messages
let messages = [
    (Duration::from_millis(100), "...à tous"),
    (Duration::from_millis(50), "Bonjour..."),
];

// Transformation des délais en deadlines
let debut = Instant::now();
let messages = messages.map(|(duree, texte)| (debut + duree, texte));

// Création des futures d'attentes et d'affichage
let mut futures = Vec::new();
for (deadline, texte) in messages {
    // NOUVEAU : On crée des futures et on les garde de côté
    futures.push(
        // NOUVEAU : On utilise le sleep_until() de tokio pour obtenir une
        //           future qui attend jusqu'à une deadline.
        tokio::time::sleep_until(deadline)
            // NOUVEAU : On utilise la méthode map() fournie par la crate
            //           futures pour programmer du travail après l'attente.
            .map(move |()| {
                // ATTENTION : Il y a un problème ici, on va revenir dessus
                println!("Temps écoulé : {:?}", debut.elapsed());
                println!("Message : {texte}");
            }),
    );
}

// NOUVEAU : On combine toutes les futures d'attente en une seule future
let attente = future::join_all(futures);

// NOUVEAU : On exécute cette future combinée de façon synchrone
runtime.block_on(attente);

Quoi de neuf dans cet exemple ?

  • Avec l’import de futures::prelude::*, on active l’ensemble des utilitaires communs fournis par la crate futures pour manipuler des futures.
  • On initialise le runtime tokio pendant la phase d’initialisation de notre programme. Comme tokio utilise le thread-local storage pour les accès au runtime, on doit aussi installer le runtime au sein du thread actif pour qu’il soit disponible par la suite lorsqu’on appellera l’API tokio.
  • On utilise la fonction sleep_until() de tokio pour obtenir une future qui représente une tâche asynchrone dont l’exécution se terminera lorsqu’une certaine deadline sera dépassée.
  • On utilise la méthode future.map() fournie par la crate futures pour programmer l’affichage de texte après que la deadline soit atteinte, et obtenir une nouvelle future dont l’exécution sera considérée comme terminée après attente ET affichage du message.
  • On utilise la fonction join_all() fournie par la crate futures pour créer une future combinée qui représente l’exécution concurrente de toutes nos tâches asynchrones.
  • On utilise le runtime tokio pour exécuter la future combinée retournée par join_all() de façon synchrone, ce qui déclenche l’exécution concurrente de toutes nos tâches.

Avec cette approche, quel que soit le nombre de messages que l’on veuille afficher, le runtime tokio travaillera à nombre de threads constant. Toute la concurrence entre les tâches sera purement gérée via les APIs du système d’exploitation, qui permettent d’attendre plusieurs deadlines ou plusieurs opérations d’entrées/sorties avec un seul appel système. Le risque d’explosion de la consommation mémoire et de l’overhead CPU quand le nombre de tâches en vol augmente est donc beaucoup plus faible que quand on utilise un thread par tâche.

Si vous avez compris cet exemple, félicitations, vous savez maintenant comment on faisait de la programmation asynchrone en 2016, lorsque les futures Rust venaient de sortir. Dans les chapitres suivants, nous allons voir comment le Rust moderne simplifie grandement l’écriture de code asynchrone, grâce à la syntaxe async/await introduite en 2018.

Mais avant ça, nous devons résoudre un petit problème du code ci-dessus.

Opérations bloquantes

Dans l’exemple ci-dessus, notre utilisation de future.map() n’est pas correcte, car nous détournons cette fonction pour faire quelque chose que l’on doit éviter à tout prix en programmation asynchrone : appeler une opération bloquante, en l’occurence println!().

tokio::time::sleep_until(deadline)
    .map(move |()| {
        // PROBLEME : On ne devrait pas utiliser println!() ici
        println!("Temps écoulé : {:?}", debut.elapsed());
        println!("Message : {texte}");
    })

L’idée centrale de l’asynchronisme, c’est d’utiliser le nombre minimal de threads possibles pour tirer pleinement parti du matériel. Mais cette stratégie ne fonctionne que si les threads n’exécutent que des opérations non bloquantes. Si ils commencent à exécuter des opérations bloquantes, alors…

  • Au mieux, la performance va se dégrader car on n’utilise plus tous les coeurs CPUs.
  • Au pire, le programme va se bloquer car tous les threads du runtime sont bloqués en attendant que d’autres tâches asynchrones progressent, tandis que ces tâches ne peuvent pas progresser car il n’y a plus de threads disponibles pour les exécuter.

On dont donc s’assurer que le code voué à s’exécuter de façon asynchrone évite au maximum l’utilisation d’opérations bloquantes. Ni entrées-sorties bloquantes, ni opérations bloquantes (ex : attendre N secondes via std::thread::sleep()), ni primitives bloquantes de synchronisation entre threads, autant que faire se peut. C’est mauvais à petite dose, et c’est mortel à haute dose.

Par quoi remplacer ces opérations bloquantes ? Tout dépend de l’opération en question :

  • Parfois, ça a du sens de les exécuter en parallèle via une réserve de threads dédiés aux tâches bloquantes, avec des outils comme le spawn_blocking() de tokio.
  • Parfois, les opérations sont bloquantes parce qu’elles sont sérialisées entre les threads. Dans ce cas, ça n’a pas de sens de les exécuter en parallèle, il est préférable d’avoir un thread unique dédié à ces opérations et de lui soumettre du travail via une file d’attente.
  • Et parfois, il existe une alternative non-bloquante aux opérations bloquantes qu’on essaie d’utiliser, et on peut tout simplement utiliser cette alternative. C’est souvent la voie royale au niveau des performances, mais hélas ce n’est pas toujours la voie la plus simple.

Asynchronisme complet

Dans le cas d’un accès à stdout comme println!(), nous sommes dans le cas où on peut tout faire de façon asynchrone, moyennant quelques précautions :

  • tokio nous fournit un moyen non-bloquant d’écrire dans stdout. Mais nous sommes avertis que cette alternative à println!() n’est pas synchronisée, donc si plusieurs tâches asynchrones tentent d’écrire sur stdout en même temps, leurs sorties seront mélangées.
  • tokio nous fournit également des alternatives asynchrones aux primitives de synchronisation entre threads de la bibliothèque standard, que nous pouvons utiliser pour rétablir la synchronisation des accès à stdout si besoin.

De façon surprenante, pour l’exemple qui nous occupe ici, nous n’avons pas besoin de cette seconde précaution. En effet, si l’on y regarde de plus prêt, à la fin du programme nous ne confions au runtime tokio qu’une seule tâche à exécuter :

// Construction d'une future composite
let attente = future::join_all(futures);

// Attente de la future composite
runtime.block_on(attente);

Certes, cette tâche attend plusieurs opérations asynchrones de façon concurrente. Mais tel que le programme est écrit actuellement, le code associé à ces opérations n’est pas exécuté de façon concurrente. Seule l’attente est concurrente, les println!() sont séquentiels (ce qui n’est pas un problème ici car ça n’a pas beaucoup d’intérêt d’exécuter println!() en parallèle).

Et donc il suffit de modifier le code comme ceci pour respecter les règles de l’asynchronisme :

use tokio::io::AsyncWriteExt;

// ... le début est comme avant ...

// Création des futures d'attentes et d'affichage
let mut futures = Vec::new();
for (deadline, texte) in messages {
    futures.push(
        tokio::time::sleep_until(deadline)
            // NOUVEAU : then permet de programmer l'exécution d'une opération
            //           asynchrone à la suite d'une autre opération asynchrone,
            //           et async/await permet l'emprunt de "sortie".
            .then(move |()| async move {
                let sortie = format!(
                    "Temps écoulé : {:?}\n\
                    Message : {texte}\n",
                    debut.elapsed()
                );
                let mut stdout = tokio::io::stdout();
                stdout.write_all(sortie.as_bytes()).await
            }),
    );
}

// NOUVEAU : try_join_all part de plusieurs futures de `Result<T, E>` et
//           produit un `Result<Vec<T>, E>`.
let attente = future::try_join_all(futures);

// Comme avant, mais avec une gestion des erreurs
runtime.block_on(attente).expect("Erreur d'entrée/sortie");

Quoi de neuf dans cette version modifiée ?

  • On utilise future.then() au lieu de future.map(), car l’écriture sur stdout est désormais une opération asynchrone qui retourne une future.
  • A l’intérieur de future.then(), on utilise async/await pour permettre l’emprunt du tampon sortie durant l’écriture sur stdout. Nous reviendrons prochainement sur cette possibilité.
  • Ni tokio ni futures ne fournissent d’alternative à println!(), donc nous devons pré-calculer le texte avec format!() avant de l’envoyer sur stdout.
  • Les entrées/sorties sur stdout sont traitées comme faillibles par tokio, donc nous devons introduire une gestion des erreurs d’entrées/sorties.

On le voit, il est possible d’éviter des opérations bloquantes, mais ce n’est pas forcément simple vu l’omniprésence de ces opérations est l’immaturité relative des alternatives. Je ne vous cacherai pas qu’en pratique, c’est souvent l’aspect le plus rébarbatif de la programmation asynchrone.