La crate futures

L’idée centrale de la programmation asynchrone, c’est qu’une routine d’entrée-sortie de l’OS puisse répondre au thread qui l’appelle “je n’ai pas la réponse tout de suite, va faire autre chose et reviens me voir plus tard” au lieu de bloquer le thread en attendant que l’opération soit terminée.

Il existe plusieurs manières de fournir une interface de plus haut niveau par dessus ce type d’API. Dans ce chapitre, nous allons explorer la notion de future, qui est l’approche plesbicitée en Rust.

Même si vous avez déjà utilisé des futures dans d’autres langages, y compris le type std::future de C++, ne passez pas ce chapitre. Ce que Rust appelle une future est différent de ce que la plupart des autres langages appellent une future, et il est important que ayez une bonne compréhension du fonctionnement des futures de Rust si vous comptez les utiliser.

Futures

De même que les opérations d’entrées/sorties asynchrones de l’OS rendent immédiatement la main à l’appelant, une façon classique d’exposer une interface asynchrone dans une bibliothèque haut niveau est d’avoir une API qui retourne immédiatement un objet “future”.

Cet objet représente la promesse d’un résultat futur. L’appelant peut s’en servir pour différentes choses, notamment déterminer si le résultat est arrivé, programmer d’autres opérations lorsque le résultat arrivera, ou attendre l’arrivée du résultat lorsqu’il n’y a plus rien d’autre à faire.

L’intérêt d’une abstraction future bien conçue, c’est qu’elle donne à l’appelant le contrôle de la stratégie d’attente du résultat de l’entrée/sortie, au lieu d’imposer une certaine stratégie pas toujours adaptée (par exemple devoir attendre que le résultat arrive sans pouvoir rien faire d’autre dans le thread actif, comme les APIs synchrones).

Rust utilise des futures, mais avec une spécificité qui est que l’API qui retourne une future ne fait pas d’entrées/sorties. C’est l’objet future lui-même qui initie le processus, de façon paresseuse, au moment où on commence à attendre que le résultat arrive. Cela a plusieurs intérêts :

  • L’implémentation des futures peut être plus efficace. Avec les futures classiques, on a besoin d’une allocation tas et d’une couche de polymorphisme dynamique par étape d’entrée/sortie d’une tâche asynchrone. Alors qu’avec les futures paresseuses de Rust, on n’a besoin que d’une allocation tas et une couche de polymorphisme dynamique par tâche asynchrone, et dans certains cas on peut même s’en tirer sans allocation tas ni vtable.
  • On peut plus facilement gérer l’annulation des tâches asynchrones en cours ainsi que la backpressure (quand un serveur est surchargé, il accepte moins de tâches du client).
  • Si l’implémentation d’une future a besoin de données auto-référentielles (nous allons voir plus tard pourquoi), elle n’en a pas besoin dès que la future est créée, mais seulement à partir du moment où la tâche asynchrone est lancée. Avant ça, la future peut être déplacée librement.

Le coeur de l’interface des futures est aujourd’hui disponible dans la bibliothèque standard, via le trait Future. Mais comme nous allons le voir dans ce qui suit, cette interface de base est très simple et bas niveau. Toutes les fonctionnalités plus avancées des futures sont implémentées dans la crate futures, au sein de laquelle les futures Rust ont été initialement mises au point.

Par exemple, en important futures::prelude::*, on gagne un accès aux extensions FutureExt qui permettent de programmer davantage de travail à la suite de la tâche asynchrone associée à une Future, ainsi qu’au trait TryFuture et ses extensions TryFutureExt qui facilitent la manipulation de futures représentant des opérations asynchrones faillibles.

Il est probable qu’un jour, davantage de fonctionnalités de futures soient intégrées à la bibliothèque standard, mais à l’heure actuelle on ne peut pas dire quand ça se produira.

Tâches asynchrones

La base de l’interface des futures en Rust est une méthode poll() ayant la signature suivante :

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>

On constate d’abord cette méthode prend son paramètre par référence Pin<&mut Self>. Cela garantit à l’implémentation de la future qu’un appelant ne va pas déplacer la future entre deux appels à poll(), et permet donc à l’implémentation de stocker des données auto-référentielles à l’intérieur de l’objet future à partir du premier appel à poll().

Lorsque la méthode poll() est appelée, elle retourne un objet de type énuméré Poll<Self::Output>, où Self::Output est le type du résultat final de la tâche asynchrone. Le type énuméré Poll est défini de la façon suivante :

#![allow(unused)]
fn main() {
pub enum Poll<T> {
    Ready(T),
    Pending,
}
}

Ces deux variantes représentent deux scénarios possibles pour l’appelant de poll() :

  • Si poll() retourne Poll::Ready, la tâche asynchrone est terminée et son résultat est retourné à l’appelant. A partir de ce moment, la méthode poll() ne doit plus être utilisée.
  • Si poll() retourne Poll::Pending, la tâche asynchrone a atteint un point bloquant (entrée/sortie). On doit attendre que ce point bloquant soit passé, puis rappeler poll().

Le mécanisme qui permet d’attendre que le point bloquant soit passé est l’argument Context de poll(). Ce Context donne accès à un Waker qui permet à l’implémentation de la tâche asynchrone de signaler quand le résultat de l’entrée/sortie asynchrone est disponible, afin que la méthode poll() de la future associée soit rappelée pour continuer la tâche asynchrone.

On le voit, tout ça est plutôt bas niveau, ce n’est pas le style d’interface qu’on a envie d’utiliser directement au quotidien. La façon normale d’utiliser des futures en Rust est de passer par une bibliothèque fournissant un ensemble d’opérations d’entrées/sorties retournant des futures ainsi qu’un moyen d’ordonnancer l’exécution de tâches asynchrones. On parle de runtime asynchrone.

Il existe plusieurs runtimes asynchrones. Certains sont généralistes, d’autres sont spécialisés pour des domaines précis d’applications (par exemple, les systèmes résilients aux pannes, ou les systèmes embarqués sans OS). Dans les chapitres suivants, nous ferons appel à tokio, le runtime asynchrone le plus utilisé aujourd’hui.

Mais avant de nous focaliser davantage sur tokio, nous allons d’abord terminer notre tour des primitives de bases de l’asynchronisme en Rust, en parlant un peu des autres fonctionnalités communes fournies par la crate futures.

Flux de données typés

Souvent, quand on fait des opérations d’entrées/sorties, on ne lit pas qu’une seule donnée. On lit plutôt un flux continu de données du même type : paquets UDP, blocs d’octets, requêtes HTTP, etc. Et de même, on n’écrit pas qu’une seule données, on écrit un flux continu de données similaires.

On pourrait représenter chaque opération de lecture/écriture comme une opération asynchrone complètement indépendante, à laquelle on associe une future dédiée. Mais il est à la fois plus ergonomique et plus efficace au niveau de l’implémentation d’avoir une abstraction dédiée pour cette situation, analogue aux itérateurs de la bibliothèque standard.

Cette abstraction, c’est Stream pour les données d’entrée, et Sink pour les données de sortie.

Stream est une généralisation asynchrone de Iterator. Un Stream de T se comporte comme une future réutilisable de Option<T> : tant que l’opération poll_next() produit des Poll::Ready(Some(x)), on peut continuer de l’appeler. C’est seulement quand on reçoit un Poll::Ready(None) que le flux d’entrée est tari et qu’on doit arrêter d’appeler poll_next().

A l’inverse, un Sink de T accepte des valeurs de type T en entrée via le protocole suivant :

  • D’abord, on doit utiliser l’opération asynchrone poll_ready() pour attendre que le Sink soit prêt à accepter une nouvelle valeur en entrée.
  • Quand poll_ready() indique que le Sink est prêt, on peut lui soumettre une nouvelle valeur à envoyer avec start_send().
  • Le cycle poll_ready()/start_send() peut être répété autant de fois que nécessaire pour envoyer toutes les valeurs souhaitées
  • Un Sink est autorisé à accumuler des données en interne avant envoi. Lorsqu’on veut s’assurer que toutes les données ont bien été envoyées, on utilise poll_flush() si on veut envoyer d’autres données par la suite, ou poll_close() si on a terminé.

De la même façon que la crate futures complète le trait bas niveau Future par des extensions FutureExt et TryFutureExt qui l’enrichissent avec des opérations plus haut niveau, les traits bas niveau Stream et Sink sont complétés par des extensions StreamExt, TryStreamExt et SinkExt qui les enrichissent avec des opérations plus haut niveau.

Par exemple SinkExt::send_all() permet de récupérer l’ensemble des valeurs d’un Stream, les envoyer à un Sink, et récupérer une Future qui sera résolue lorsque ce travail sera terminé.

Entrées/sorties asynchrones

Parmi les flux de données, le cas particulier des flux d’octets faillibles est important, car c’est en ces termes que sont exprimées les APIs d’entrées/sorties asynchrones de tous les systèmes d’exploitation couramment utilisés.

Ce ne serait pas efficace de représenter ces flux d’octets par un TryStream ou un Sink de u8, car on devrait initier une transaction d’entrée/sortie asynchrone au niveau du système pour chaque octet envoyé ou reçu, ce qui ferait bien trop de transactions.

A la place, la crate futures fournit donc les traits AsyncRead, AsyncBufRead, AsyncWrite et AsyncSeek, qui reprennent l’interface des traits Read, BufRead, Write et Seek de la bibliothèque standard mais sous une forme qui se prête aux entrées/sorties asynchrone.

Selon une logique désormais habituelle, ces traits sont complétés par des extensions AsyncReadExt, AsyncBufReadExt, AsyncWriteExt et AsyncSeekExt, qui donnent des fonctionnalités plus haut niveau aux implémentations des traits AsyncXyz.

Composition d’opérations asynchrones

Il est courant de vouloir combiner plusieurs opérations asynchrones. Par exemple, on peut vouloir construire une opération asynchrone qui attend que les opérations asynchrones représentées par trois futures a, b et c soient toutes terminées (opération join). Ou bien on peut attendre qu’une de ces opérations soit terminée (opération select).

futures fournit plusieurs implémentations de join et select qui répondent à différents besoins : nombre de futures connu à l’avance ou pas, besoin de prioriser certaines opérations par rapport à d’autres ou pas, etc. Une partie de ces opérations est implémentée sous forme de macros, le reste dans des modules dédiés (ex : module future pour la composition de futures).

Pour donner un exemple d’application, une attente d’entrée/sortie asynchrone avec timeout est typiquement implémentée par un select entre une futures d’entrée/sortie asynchrone et une future de timeout. Si le timeout se termine en premier, la future d’entrée/sortie asynchrone est jetée, ce qui déclenche l’annulation de l’opération sous-jacente.

Il est important de bien comprendre que ces combinateurs n’introduisent pas de parallélisme dans l’exécution de code. Si plusieurs futures dans un join ou un select sont prêtes, leurs méthodes poll() seront appelées séquentiellement, les unes après les autres, sur un seul coeur CPU. Nous allons voir un peu plus loin comment faire quand ce n’est pas ce qu’on veut.

Ce que join et select permettent, en revanche, c’est d’introduire de la concurrence, donc d’attendre plusieurs événements simultanément. Par exemple en utilisant join, on peut lancer plusieurs entrées/sorties asynchrones au niveau du système d’exploitation et attendre que l’ensemble des résultats de ces entrées/sorties soient disponibles avant de continuer.

Exécuteurs et synchronisation

Introduction

On l’a vu précédemment, pour exécuter une tâche asynchrone, on appele à plusieurs reprises la méthode poll() de la future associée, jusqu’à ce qu’elle retourne Poll::Ready, en attendant entre chaque Poll::Pending le signal d’un Waker.

Les autres abstractions de la crate futures dont nous avons discuté (Stream, Sink, AsyncRead, etc.) utilisent un principe similaire, donc dans la suite nous allons nous concentrer sur le cas des futures, mais la plupart de ce qui va être dit sera applicable au reste.

Le composant d’un runtime asynchrone qui se charge d’exécuter des tâches asynchrones est appelé un exécuteur, et la crate futures fournit des exécuteurs simples qui peuvent être utilisé quand on n’a pas besoin d’un runtime asynchrone plus complet. Nous allons maintenant étudier les stratégies d’exécution proposées par la crate futures, par ordre de complexité croissante.

Exécution bloquante

Avec futures::executor::block_on(), on exécute une future de façon bloquante. A chaque fois que la méthode poll() de la future retourne Poll::Pending, le thread actif s’arrête en attendant que la situation se débloque. Et lorsque poll() finit par émettre le résultat final via Poll::Ready, l’exécution de block_on() se termine en retournant ce résultat à l’appelant.

A quoi cela peut-il bien servir de s’embêter à écrire du code asynchrone pour finalement le rendre synchrone ? Typiquement lorsqu’on a besoin d’un résultat tout de suite pour “nourrir” une API synchrone, ou bien à la fin de la fonction main() pour s’assurer que l’ensemble des tâches asynchrones qu’on a préparé précédemment soit bien exécuté.

Ainsi, un programme asynchrone typique va souvent commencer par créer une grosse future représentant l’ensemble des tâches qu’il a à traiter, puis effectuer un appel block_on() sur cette future pour exécuter toutes ces tâches avant de s’arrêter.

Exécution séquentielle

L’utilisation efficace de block_on() nous impose une gymnastique désagréable avec les combinateurs join et select, pour nous assurer que l’ensemble des futures que nous voulons exécuter soit bien couvert par l’attente que nous nous préparons à effectuer.

Une approche plus élégante est d’utiliser LocalPool, qui nous permet de programmer l’exécution d’un certain nombre de futures au sein du thread actif via un objet de type LocalSpawner, obtenu via la méthode pool.spawner() et qui implémente les traits Spawn et LocalSpawn, complétés par les extensions plus haut niveau SpawnExt et LocalSpawnExt.

On peut utiliser des méthodes de ces traits, comme LocalSpawnExt::spawn_local(), pour programmer l’exécution de tâches asynchrones au sein de la LocalPool.

Une fois qu’on a soumis des tâches via le mécanisme du LocalSpawner, LocalPool permet de les exécuter de différentes façons :

  • Avec run(), on peut exécuter complètement l’ensemble des futures soumises précédemment, comme si on avait appelé block_on() sur le join de ces futures.
  • Avec run_until(), on peut exécuter l’ensemble des futures soumises précédmment jusqu’à ce qu’une future nouvellement soumise se soit complètement exécutée, ce qui nécessiterait une combinaison complexe de join, select et block_on() si on tentait de le faire sans LocalPool.
  • Avec try_run_one() et run_until_stall(), on peut exécuter l’ensemble des futures soumises précédemment jusqu’à ce qu’une tâche se termine (pour try_run_one()) ou que l’ensemble des tâches atteigne un point où il n’y a plus rien d’autre à faire qu’attendre.

Comme les combinateurs, LocalPool n’introduit pas de parallélisme, juste de la concurrence : les méthodes poll() des futures qui sont prêtes à s’exécuter sont appelées les unes après les autres, sur un seul coeur CPU. Tout ce que fait LocalPool, on pourrait le faire avec des combinateurs et block_on(), c’est juste (beaucoup) plus simple avec l’aide de LocalPool.

Exécution parallèle

Nous l’avons dit plusieurs fois, quand on utilise block_on() avec des combinateurs ou LocalPool on n’a que de la concurrence (attendre plusieurs choses en même temps) et pas du parallélisme (exécuter plusieurs morceaux du code de notre programme en même temps).

Pour exécuter nos tâches asynchrones de façon parallèle, on peut utiliser un autre exécuteur fourni par la crate futures, appelé ThreadPool. Comme son nom l’indique, cet exécuteur gère un groupe de threads, par défaut un par hyperthread CPU. On peut lui soumettre des tâches via la méthode pool.spawn_ok(), et ces tâches s’exécuteront en parallèle sur les threads de l’exécuteur.

Si on compare ThreadPool à LocalPool

  • ThreadPool n’accepte que des futures Send pouvant être transmises à un autre thread pour exécution. C’est tout à fait logique, mais ça signifie qu’on ne peut plus utiliser des abstractions non thread-safe comme Rc, ce qui peut nécessiter quelques modifications du code.
  • ThreadPool ne fournit aucun moyen de se synchroniser avec la tâche en cours d’exécution, pas même de savoir quand est-ce qu’elle se termine. C’est à nous d’implémenter la synchronisation nécessaire.

Dans l’ensemble, la ThreadPool de futures est bâtie pour la simplicité (son implémentation tient en quelques centaines de ligne de code), pas pour l’ergonomie ni pour la performance. Cet exécuteur suffit pour des besoins simples, mais toute application un tant soit peu complexe gagnera fortement à utiliser l’exécuteur parallèle plus sophistiqué fourni par un runtime complet comme tokio, qui sera plus facile à utiliser, aura moins d’overhead, et passera mieux à l’échelle sur des systèmes avec beaucoup de coeurs CPU.

Synchronisation

Dès lors qu’on a de l’exécution parallèle de tâches sur plusieurs threads, on a besoin de synchronisation entre threads. On l’a vu, dans le cas de la ThreadPool simple de futures ce besoin émerge ne serait-ce que pour attendre que nos tâches qui s’exécutent en parallèle se terminent.

Dans un contexte de programmation asynchrone, c’est plus complexe que d’habitude de se synchroniser, car on doit éviter d’utiliser des primitives de synchronisation bloquantes basées sur le fait d’attendre au sein du thread actif qu’un autre thread fasse quelque chose. En effet, toute l’idée de la programmation asynchrone parallèle, c’est de travailler avec un nombre minimum de threads (un par coeur ou hyperthread CPU), donc si on commence à bloquer lesdits threads

  • Au mieux, on va arrêter d’utiliser certains de nos coeurs CPU, donc perdre en efficacité.
  • Au pire, on va se retrouver dans une situation où tous nos threads attendent qu’une autre tâche asynchrone s’exécute pour continuer, mais plus aucune tâche ne peut s’exécuter car tous les threads sont bloqués, et donc l’exécution asynchrone sera définitivement bloquée.

On privilégiera donc l’utilisation de primitives de synchronisation adaptées à la programmation asynchrone, où les opérations qui seraient normalement bloquantes deviennent des opérations asynchrones. La crate futures en fournit quelques unes :

  • Dans le module channel, on retrouve une variante asynchrone des files d’attente mpsc de la bibliothèque standard, ainsi qu’une variante spécialisée oneshot qui sert à envoyer une seule valeur, une seule fois, d’une tâches asynchrone à une autre.
    • La primitive oneshot est exactement ce dont on a besoin pour récupérer les résultats de tâches créées par des APIs de type ThreadPool::spawn_ok().
  • Dans le module lock, on retrouve une variante asynchrone du type Mutex de la bibliothèque standard, ainsi qu’un type plus spécialisé BiLock optimisé pour le cas où il n’y a que deux tâches qui partagent une donnée.

Runtimes et interopérabilité

Nous avons maintenant terminé notre tour de la crate futures, qui pose les bases communes de l’asynchronisme en Rust sur lesquelles tout le monde s’accorde.

Mais pour faire quoi que ce soit d’utile avec de l’asynchronisme, il va nous falloir aussi des implémentations concrètes d’opérations asynchrones, par exemple…

  • Des primitives pour faire des entrées/sorties asynchrones sur le disque, en réseau…
  • Un système de minuteur permettant d’attendre des deadlines, de fixer des timeouts
  • Des interfaces asynchrones vers d’autres fonctionnalités systèmes importantes : création et attente de processus enfants, gestion des signaux Unix, logging dans la console…

De plus, nous avons aussi mentionné que le support de l’exécution parallèle directement fourni par futures est minimal, et qu’on gagnera souvent à le remplacer par d’autres exécuteurs plus ergonomiques, plus économes en CPU, et passant mieux à l’échelle.

Un runtime asynchrone est une bibliothèque qui répond à tous ces besoins. On l’a évoqué précédemment, il en existe plusieurs, plus ou moins spécialisés dans un domaine d’application. Dans la suite de ce cours, nous allons faire appel au runtime généraliste le plus utilisé, tokio.

Dans un monde idéal, le runtime que vous utilisez n’aurait pas d’importance, et vous pourriez librement envoyer les futures d’un runtime à l’exécuteur d’un autre. C’est l’objectif de long terme, mais malheureusement à l’heure où ces lignes sont écrites nous n’en sommes pas là. Il faut donc éviter de mélanger plusieurs runtimes dans votre code sous peine de rencontrer des problèmes bizarres (ex : tâches d’entrée/sortie qui se bloquent indéfiniment car la routine d’attente groupée de leur runtime n’est pas appelée) ou des problèmes d’oversubscription (ex : les exécuteurs parallèles de deux runtimes se battent pour le temps CPU d’un même coeur CPU).

Avant de vous lancer dans un projet utilisant l’asynchronisme, je vous recommande donc de commencer par étudier les bibliothèques asynchrones utiles à votre projet, les runtimes que chacune supporte, en choisir un qui met tout le monde d’accord, et essayer de vous y tenir. En cas de doute, privilégiez les runtimes les plus populaires comme tokio : c’est ceux que vos bibliothèques métier auront le plus de chances de supporter.