async/await

Aux premiers temps de la programmation asynchrone en Rust, on faisait tout avec des futures, des combinateurs et des runtimes asynchrones comme dans l’exemple précédent (dernière section exceptée). Mais cette approche n’était pas pleinement satisfaisante car…

  • Le code qui utilise des futures et des combinateurs était trop différent de sa version synchrone. En particulier, le flux de contrôle asynchrone (if/else via select, gestion des erreurs d’entrées/sorties asynchrones…) était trop difficile à gérer.
  • On ne pouvait pas utiliser l’emprunt et les références dans le code asynchrone (ex : allouer un tampon d’octets alloué sur la pile, effectuer un AsyncRead qui écrit des octets dans ce tampon via un &mut [u8], puis utiliser les données fraîchement écrites dans le tampon). Il fallait tout faire avec des données allouées sur le tas, ce qui était inefficace et peu ergonomique.

Pour résoudre ces problèmes d’ergonomie et d’efficacité, Rust a introduit vers 2018 le mécanisme async/await. Ce dernier est très rapidement devenu la façon dominante d’écrire du code asynchrone, au point que le code utilisant directement des futures comme celui du chapitre précédent est aujourd’hui considéré comme un peu has been, exotique et dérangeant. Dans ce chapitre, nous allons donc voir ce que fait async/await et comment l’utiliser pour rendre le code du chapitre précédent presque aussi clair que celui d’une application utilisant des threads.

Enjeux de conception

Toute intégration de la programmation asynchrone à un langage de programmation doit se positionner quelque part entre deux extrêmes :

  • A un extrême, on peut faire comme Go et avoir du code asynchrone qui ressemble exactement au code synchrone dans d’autres langages. Mais le prix à payer est que toutes les opérations d’entrée/sortie de la bibliothèque standard doivent être asynchrones, l’interopérabilité avec les autres langages de programmation est réduite, et le coût d’une tâche asynchrone en vol est relativement élevé (quoiqu’un peu plus faible que celui d’un thread d’OS).
  • A l’autre extrême, on peut faire comme les programmeurs C et utiliser directement les APIs asynchrones de l’OS sans support du langage. Avec cette approche, on a un contrôle total sur l’utilisation des APIs et on peut faire en sorte d’en faire un usage aussi optimal que possible. Mais le code qu’on écrit n’a pas grand chose à voir avec du code synchrone, il tient plus de la machine à états codée à la main. C’est laborieux, et l’erreur est fréquente.

Comme d’autres langages, Rust opte pour le compromis du async/await :

  • Le code asynchrone doit être annoté avec le mot-clé async. Il est soumis à quelques restrictions (ex : pas de récursion directe), retourne une future, et ne peut être appelé directement par une partie synchrone du programme. Les programmeurs Go sont fans.
  • Les opérations potentiellement bloquantes doivent être annotées avec le mot-clé await, ce qui clarifie les points d’attente pour le lecteur et le compilateur. Ayant connaissance de ces points d’attente, le compilateur peut transformer du code async ressemblant furieusement à du code synchrone en une machine à état aussi efficace que celle que l’on aurait codé à la main en C (ou en utilisant des combinateurs de futures en Rust).

Cependant, l’implémentation d’async/await de Rust tire parti des spécificités de l’écosystème asynchrone de ce langage, et notamment du fait que les futures de Rust sont mieux conçues que celles de la plupart des autres langages (cf chapitre précédent), pour être particulièrement efficace.

Ainsi, contrairement à ce qui se passe avec les coroutines C++20, un code Rust utilisant async/await n’est pas forcé de faire une allocation tas par bloc asynchrone utilisé.

Exemple détaillé

Si l’on reprend le bloc de code async introduit dans le chapitre précédent…

async move {
    let sortie = format!(
        "Temps écoulé : {:?}\n\
        Message : {texte}\n",
        debut.elapsed()
    );
    let mut stdout = tokio::io::stdout();
    stdout.write_all(sortie.as_bytes()).await
}

…voici ce qu’on peut en conclure à la lumière des explications précédentes.

D’abord, c’est un bloc async, donc il retourne une future du résultat final correspondant à la dernière expression du bloc. Ici, cette expression est stdout.write_all(/* ... */).await, et elle retourne un io::Result<()> qui indique si l’écriture sur stdout s’est bien passée ou pas.

A partir de ce bloc async, le compilateur va générer une implémentation du trait Future sous forme de machine à états avec…

  • Un état initial où le code asynchrone n’a pas encore commencé à s’exécuter
    • Ici, ça correspond à la capture initiale des variables externes texte et debut.
  • Un état par point d’arrêt await dans le code, dans lequel on peut se retrouver si le code a commencé à effectuer l’opération asynchrone associée à ce point await mais son résultat n’est pas encore disponible et on doit retourner Poll::Pending
    • Ici, ça correspond au cas où l’opération d’écriture asynchrone stdout.write_all() a été lancée, mais elle n’a pas pu être terminée en une seule transaction non-bloquante. L’OS nous a invité à revenir plus tard, et un gestionnaire d’événements a été installé pour savoir quand on pourra poursuivre l’écriture sur stdout. A ce stade, notre future de bloc async délègue le travail asynchrone à la future retournée par stdout.write_all().
  • Un état final qui est atteint après que le résultat final ait été émis par Poll::Ready, où la méthode poll() ne devrait plus être appelée.
    • Ici, ça correspond à la fin du bloc, où le résultat de write_all() a été récupéré et propagé vers l’appelant de la tâche asynchrone.

Quand on appelle la méthode poll() de la future résultante, elle passera d’un état à l’autre de la machine à états sous-jacente en exécutant le code situé entre le point d’arrêt de départ et le point d’arrêt d’arrivée, puis elle émet le résultat Poll::Pending si l’exécution n’est pas terminée ou Poll::Ready si l’exécution est terminée.

Etat auto-référentiel

Tout ceci est bien amusant, mais à ce stade vous vous demandez peut-être pourquoi je vous ai embêté avec Pin et les types auto-référentiels au début de ce chapitre.

Cela tient au fait que tout bloc async a tendance à générer des structures de données auto-référentielles en présence de code idiomatique ayant recours à l’emprunt de références.

Dans le cas présent, notre bloc async d’exemple…

  • Commence par construire une chaîne de caractères appelée sortie.
  • Emprunte un &[u8] issu de cette chaîne de caractères avec l’expression sortie.as_bytes().
  • Transmet ce &[u8] à la méthode stdout.write_all().
  • …et peut se retrouver à se bloquer là et sauvegarder son état interne, comprenant à la fois la chaîne de caractères sortie et un &[u8] capturé par stdout.write_all() qui pointe vers ladite chaîne de caractères.

…donc si notre chaîne de caractères était stockée sur la pile (ce qui est le cas de certaines implémentations de chaînes de caractères utilisant la “small string optimization”), la structure de données associée à l’état sauvegardé pendant l’opération asynchrone stdout.write_all() serait auto-référentielle et ne pourrait pas être déplacée en toute sécurité.

En l’occurence, la structure n’est pas réellement auto-référentielle car la chaîne de caractères est allouée sur le tas et on a donc juste affaire à deux pointeurs ciblant une même allocation tas. Mais le système de typage de Rust traite ces deux situations de façon identique pour que les propriétés du code ne dépendent pas des détails d’implémentation du type chaîne de caractères utilisé.

Fonctions async

Si vous avez compris les blocs async, les fonctions async ne devraient pas vous poser beaucoup de problèmes. Une fonction comme ceci…

#![allow(unused)]
fn main() {
type T = usize;

async fn identique(x: T) -> T {
    x
}
}

…retourne immédiatement une future qui capture le paramètre d’entrée x. Le premier appel à la méthode poll() de cette future retournera immédiatement Poll::Ready(x).

Et un cas plus complexe comme ceci…

#![allow(unused)]
fn main() {
type T = usize;

async fn enfant1() -> T {
    /* ... */
  42
}

async fn enfant2() -> T {
    /* ... */
  24
}

async fn parent() -> T {
    let x = enfant1().await;
    let y = enfant2().await;
    x + y
}
}

…s’interprète de la façon suivante :

  • Lorsqu’on appelle enfant1() ou enfant2(), cela retourne immédiatement une implémentation de Future<Output=T> qui produira, à terme, la valeur retournée par le code asynchrone de la fonction.
  • Lorsqu’on appelle parent(), cela retourne immédiatement une implémentation de Future<Output=T> dont la méthode poll() se comporte comme suit :
    • Au premier appel à poll(), on appelle enfant1() pour construire une future et on tente d’appeler la méthode poll() de cette future. Si le résultat est Poll::Pending, on sauvegarde cette future “enfant”, et tous les appels suivants au poll() de la future “parente” délèguent à la future retournée par enfant1() jusqu’à ce que sa méthode poll() retourne un résultat final Poll::Ready(x).
    • A partir de ce moment là, on met de côté la valeur x, jette la future enfant retournée par enfant1(), puis on appelle enfant2(). Cela construit une nouvelle future, et on répète le cycle précédent jusqu’à ce que la méthode poll() de la 2e future enfant retourne un résultat final Poll::Ready(y).
    • Lorsque ça arrive, on récupère cette valeur y, on la somme avec la valeur x stockée précédemment, et on retourne le résultat final Poll::Ready(x + y). La future parente rentre dans son état final, l’exécuteur ne doit plus appeler sa méthode poll().

Notez que les tâches asynchrones retournées par enfant1() et enfant2() ne sont pas exécutées de façon concurrente, ce qui ne semble pas optimal puisque ce sont deux tâches indépendantes. async/await nous permet de créer des futures dont le comportement reproduit fidèlement celui d’un code séquentiel synchrone, mais ce comportement n’est pas forcément optimal pour une tâche donnée. Donc async/await ne rend pas les combinateurs de futures obsolètes.

Ici, une version de parent() qui s’exécute de façon aussi concurrente que possible serait…

async fn parent() -> T {
    let (x, y) = futures::join!(enfant1(), enfant2());
    x + y
}

Retour à notre exemple

Depuis la sortie de async/await, les runtimes comme tokio ont mis au point toutes sortes d’utilitaires qui en tirent parti pour améliorer l’ergonomie. Voici notre exemple précédent d’affichage temporisé de messages, réécrit de façon idiomatique avec les outils tokio modernes :

use futures::prelude::*;
use std::time::Duration;
use tokio::{io::AsyncWriteExt, time::Instant};

// NOUVEAU : On délègue l'initialisation du runtime à la macro tokio::main
#[tokio::main]
async fn main() {
    // Définition des messages
    let messages = [
        (Duration::from_millis(100), "...à tous"),
        (Duration::from_millis(50), "Bonjour..."),
    ];

    // Transformation des délais en deadlines
    let debut = Instant::now();
    let messages = messages.map(|(duree, texte)| (debut + duree, texte));

    // Création des futures d'attentes et d'affichage
    let mut futures = Vec::new();
    for (deadline, texte) in messages {
        // NOUVEAU : On réécrit la tâche asynchrone en un seul bloc async
        futures.push(async move {
            tokio::time::sleep_until(deadline).await;
            let sortie = format!(
                "Temps écoulé : {:?}\n\
                Message : {texte}\n",
                debut.elapsed()
            );
            let mut stdout = tokio::io::stdout();
            stdout.write_all(sortie.as_bytes()).await
        });
    }

    // On combine nos futures de résultats en une future de résultat combiné
    let attente = future::try_join_all(futures);

    // NOUVEAU : On attend la fin avec await
    attente.await.expect("Erreur d'entrée/sortie");
}

On voit que même si tout ceci n’atteint pas encore la simplicité de la version utilisant des threads

use std::io::Write;
use std::time::{Duration, Instant};

fn main() {
    // Définition des messages
    let messages = [(Duration::from_millis(100), "...à tous"),
                    (Duration::from_millis(50), "Bonjour...")];

    // Transformation des délais en deadlines
    let debut = Instant::now();
    let messages = messages.map(|(duree, texte)| (debut + duree, texte));

    // Affichage temporisé d'un message
    let afficher_message = move |(deadline, texte): (Instant, &str)| {
        std::thread::sleep(deadline.saturating_duration_since(Instant::now()));
        let mut stdout = std::io::stdout().lock();
        writeln!(&mut stdout, "Temps écoulé : {:?}", debut.elapsed())?;
        writeln!(&mut stdout, "Message : {texte}")
    };

    // Lancement des threads
    std::thread::scope(|s| {
        for message in messages {
            s.spawn(move || afficher_message(message).expect("Echec de l'affichage"));
        }
    });
}

…l’écart ergonomique s’est quand même beaucoup resserré. Il devient moins ridicule qu’avant d’imaginer que dans quelques années, lorsque l’infrastructure asynchrone aura gagné en maturité avec notamment des équivalents asynchrones de std::thread::scope() et println!(), ces deux programmes pourraient devenir presque identiques à quelques async et await bien placés près.

Le reste du hibou

L’introduction de async/await a bien amélioré l’ergonomie de la programmation asynchrone en Rust, mais ça ne signifie pas que l’équipe de conception du langage peut se reposer sur ses lauriers.

L’infrastructure asynchrone qui a été intégrée au niveau du langage reste très préliminaire, et on y regrettera notamment l’absence…

  • De lambdas asynchrones async move || { /* ... */ } (oui, on peut les approximer avec move || async move { /* ... */ }, mais ça devient vite lourdingue).
  • De méthodes asynchrones dans les traits.
  • De nombreux traits fondamentaux de futures dans la bibliothèque standard.
  • D’un meilleur support des entrées/sorties asynchrones dans la bibliothèque standard.
  • D’une solution au problème de l’interopérabilité des runtimes, qui passera peut-être par l’intégration de la notion de runtime au langage et à la bibliothèque standard.
  • D’une généralisation de la notion de coroutine, introduite par la porte de derrière dans l’implémentation des blocs et fonctions async, vers un modèle plus général où on peut déclarer des itérateurs et streams qui retournent plusieurs valeurs.
  • D’une alternative au paradigme du parallélisme structuré pour le code asynchrone, qui supporte le compromis parfois intéressant de la concurrence structurée (de l’attente parallèle, mais pas d’exécution de code parallèle).

Un objectif pour les prochaines années est donc de transformer l’essai pour qu’à terme, le code asynchrone soit aussi facile à écrire que le code synchrone. Voire idéalement encore plus facile à écrire, pour que son utilisation soit privilégiée chaque fois que c’est possible, et que ça devienne le cas exceptionnel de bloquer bêtement des threads quand on a juste besoin d’attendre que le système d’exploitation finisse quelques tâches…