La crate futures
L’idée centrale de la programmation asynchrone, c’est qu’une routine d’entrée-sortie de l’OS puisse répondre au thread qui l’appelle “je n’ai pas la réponse tout de suite, va faire autre chose et reviens me voir plus tard” au lieu de bloquer le thread en attendant que l’opération soit terminée.
Il existe plusieurs manières de fournir une interface de plus haut niveau par dessus ce type d’API. Dans ce chapitre, nous allons explorer la notion de future, qui est l’approche plesbicitée en Rust.
Même si vous avez déjà utilisé des futures dans d’autres langages, y compris le
type std::future
de
C++, ne passez pas ce chapitre. Ce que Rust appelle une future est
différent de ce que la plupart des autres langages appellent une future, et il
est important que ayez une bonne compréhension du fonctionnement des futures de
Rust si vous comptez les utiliser.
Futures
De même que les opérations d’entrées/sorties asynchrones de l’OS rendent immédiatement la main à l’appelant, une façon classique d’exposer une interface asynchrone dans une bibliothèque haut niveau est d’avoir une API qui retourne immédiatement un objet “future”.
Cet objet représente la promesse d’un résultat futur. L’appelant peut s’en servir pour différentes choses, notamment déterminer si le résultat est arrivé, programmer d’autres opérations lorsque le résultat arrivera, ou attendre l’arrivée du résultat lorsqu’il n’y a plus rien d’autre à faire.
L’intérêt d’une abstraction future bien conçue, c’est qu’elle donne à l’appelant le contrôle de la stratégie d’attente du résultat de l’entrée/sortie, au lieu d’imposer une certaine stratégie pas toujours adaptée (par exemple devoir attendre que le résultat arrive sans pouvoir rien faire d’autre dans le thread actif, comme les APIs synchrones).
Rust utilise des futures, mais avec une spécificité qui est que l’API qui retourne une future ne fait pas d’entrées/sorties. C’est l’objet future lui-même qui initie le processus, de façon paresseuse, au moment où on commence à attendre que le résultat arrive. Cela a plusieurs intérêts :
- L’implémentation des futures peut être plus efficace. Avec les futures classiques, on a besoin d’une allocation tas et d’une couche de polymorphisme dynamique par étape d’entrée/sortie d’une tâche asynchrone. Alors qu’avec les futures paresseuses de Rust, on n’a besoin que d’une allocation tas et une couche de polymorphisme dynamique par tâche asynchrone, et dans certains cas on peut même s’en tirer sans allocation tas ni vtable.
- On peut plus facilement gérer l’annulation des tâches asynchrones en cours ainsi que la backpressure (quand un serveur est surchargé, il accepte moins de tâches du client).
- Si l’implémentation d’une future a besoin de données auto-référentielles (nous allons voir plus tard pourquoi), elle n’en a pas besoin dès que la future est créée, mais seulement à partir du moment où la tâche asynchrone est lancée. Avant ça, la future peut être déplacée librement.
Le coeur de l’interface des futures est aujourd’hui disponible dans la
bibliothèque standard, via le trait
Future
. Mais comme
nous allons le voir dans ce qui suit, cette interface de base est très simple
et bas niveau. Toutes les fonctionnalités plus avancées des futures
sont implémentées dans la crate
futures
, au sein de laquelle les
futures Rust ont été initialement mises au point.
Par exemple, en important futures::prelude::*
, on gagne un accès aux
extensions
FutureExt
qui permettent de programmer davantage de travail à la suite de la tâche
asynchrone associée à une Future
, ainsi qu’au trait
TryFuture
et ses extensions
TryFutureExt
qui facilitent la manipulation de futures représentant des opérations
asynchrones faillibles.
Il est probable qu’un jour, davantage de fonctionnalités de futures
soient
intégrées à la bibliothèque standard, mais à l’heure actuelle on ne peut pas
dire quand ça se produira.
Tâches asynchrones
La base de l’interface des futures en Rust est une méthode
poll()
ayant la signature suivante :
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
On constate d’abord cette méthode prend son paramètre par référence Pin<&mut Self>
. Cela garantit à l’implémentation de la future qu’un appelant ne va pas
déplacer la future entre deux appels à poll()
, et permet donc à
l’implémentation de stocker des données auto-référentielles à l’intérieur de
l’objet future à partir du premier appel à poll()
.
Lorsque la méthode poll()
est appelée, elle retourne un objet de type
énuméré
Poll<Self::Output>
, où
Self::Output
est le type du résultat final de la tâche asynchrone. Le type
énuméré Poll
est défini de la façon suivante :
#![allow(unused)] fn main() { pub enum Poll<T> { Ready(T), Pending, } }
Ces deux variantes représentent deux scénarios possibles pour l’appelant de
poll()
:
- Si
poll()
retournePoll::Ready
, la tâche asynchrone est terminée et son résultat est retourné à l’appelant. A partir de ce moment, la méthodepoll()
ne doit plus être utilisée. - Si
poll()
retournePoll::Pending
, la tâche asynchrone a atteint un point bloquant (entrée/sortie). On doit attendre que ce point bloquant soit passé, puis rappelerpoll()
.
Le mécanisme qui permet d’attendre que le point bloquant soit passé est
l’argument Context
de poll()
. Ce Context
donne accès à un
Waker
qui permet à
l’implémentation de la tâche asynchrone de signaler quand le résultat de
l’entrée/sortie asynchrone est disponible, afin que la méthode poll()
de la
future associée soit rappelée pour continuer la tâche asynchrone.
On le voit, tout ça est plutôt bas niveau, ce n’est pas le style d’interface qu’on a envie d’utiliser directement au quotidien. La façon normale d’utiliser des futures en Rust est de passer par une bibliothèque fournissant un ensemble d’opérations d’entrées/sorties retournant des futures ainsi qu’un moyen d’ordonnancer l’exécution de tâches asynchrones. On parle de runtime asynchrone.
Il existe plusieurs runtimes asynchrones. Certains sont généralistes, d’autres
sont spécialisés pour des domaines précis d’applications (par exemple, les
systèmes résilients aux pannes, ou
les systèmes embarqués sans OS). Dans
les chapitres suivants, nous ferons appel à
tokio
, le runtime asynchrone le plus
utilisé aujourd’hui.
Mais avant de nous focaliser davantage sur tokio
, nous allons d’abord terminer
notre tour des primitives de bases de l’asynchronisme en Rust, en parlant un peu
des autres fonctionnalités communes fournies par la crate futures
.
Flux de données typés
Souvent, quand on fait des opérations d’entrées/sorties, on ne lit pas qu’une seule donnée. On lit plutôt un flux continu de données du même type : paquets UDP, blocs d’octets, requêtes HTTP, etc. Et de même, on n’écrit pas qu’une seule données, on écrit un flux continu de données similaires.
On pourrait représenter chaque opération de lecture/écriture comme une opération asynchrone complètement indépendante, à laquelle on associe une future dédiée. Mais il est à la fois plus ergonomique et plus efficace au niveau de l’implémentation d’avoir une abstraction dédiée pour cette situation, analogue aux itérateurs de la bibliothèque standard.
Cette abstraction, c’est
Stream
pour
les données d’entrée, et
Sink
pour les
données de sortie.
Stream
est une généralisation asynchrone de Iterator
. Un Stream
de T
se
comporte comme une future réutilisable de Option<T>
: tant que l’opération
poll_next()
produit des Poll::Ready(Some(x))
, on peut continuer de l’appeler. C’est
seulement quand on reçoit un Poll::Ready(None)
que le flux d’entrée est tari
et qu’on doit arrêter d’appeler poll_next()
.
A l’inverse, un Sink
de T
accepte des valeurs de type T
en entrée via le
protocole suivant :
- D’abord, on doit utiliser l’opération asynchrone
poll_ready()
pour attendre que leSink
soit prêt à accepter une nouvelle valeur en entrée. - Quand
poll_ready()
indique que leSink
est prêt, on peut lui soumettre une nouvelle valeur à envoyer avecstart_send()
. - Le cycle
poll_ready()
/start_send()
peut être répété autant de fois que nécessaire pour envoyer toutes les valeurs souhaitées - Un
Sink
est autorisé à accumuler des données en interne avant envoi. Lorsqu’on veut s’assurer que toutes les données ont bien été envoyées, on utilisepoll_flush()
si on veut envoyer d’autres données par la suite, oupoll_close()
si on a terminé.
De la même façon que la crate futures
complète le trait bas niveau Future
par des extensions
FutureExt
et
TryFutureExt
qui l’enrichissent avec des opérations plus haut niveau, les traits bas niveau
Stream
et Sink
sont complétés par des extensions
StreamExt
,
TryStreamExt
et
SinkExt
qui
les enrichissent avec des opérations plus haut niveau.
Par exemple
SinkExt::send_all()
permet de récupérer l’ensemble des valeurs d’un Stream
, les envoyer à un
Sink
, et récupérer une Future
qui sera résolue lorsque ce travail sera
terminé.
Entrées/sorties asynchrones
Parmi les flux de données, le cas particulier des flux d’octets faillibles est important, car c’est en ces termes que sont exprimées les APIs d’entrées/sorties asynchrones de tous les systèmes d’exploitation couramment utilisés.
Ce ne serait pas efficace de représenter ces flux d’octets par un
TryStream
ou un Sink
de u8
, car on devrait initier une transaction d’entrée/sortie
asynchrone au niveau du système pour chaque octet envoyé ou reçu, ce qui ferait
bien trop de transactions.
A la place, la crate futures
fournit donc les traits
AsyncRead
,
AsyncBufRead
,
AsyncWrite
et
AsyncSeek
,
qui reprennent l’interface des traits Read
, BufRead
, Write
et
Seek
de la bibliothèque standard mais sous une forme qui se prête aux
entrées/sorties asynchrone.
Selon une logique désormais habituelle, ces traits sont complétés par des
extensions
AsyncReadExt
,
AsyncBufReadExt
,
AsyncWriteExt
et
AsyncSeekExt
,
qui donnent des fonctionnalités plus haut niveau aux implémentations des
traits AsyncXyz
.
Composition d’opérations asynchrones
Il est courant de vouloir combiner plusieurs opérations asynchrones. Par exemple, on peut vouloir construire une opération asynchrone qui attend que les opérations asynchrones représentées par trois futures a, b et c soient toutes terminées (opération join). Ou bien on peut attendre qu’une de ces opérations soit terminée (opération select).
futures
fournit plusieurs implémentations de join et select qui répondent
à différents besoins : nombre de futures connu à l’avance ou pas, besoin de
prioriser certaines opérations par rapport à d’autres ou pas, etc. Une partie de
ces opérations est implémentée sous forme de
macros, le reste dans
des modules dédiés (ex : module
future
pour la
composition de futures).
Pour donner un exemple d’application, une attente d’entrée/sortie asynchrone avec timeout est typiquement implémentée par un select entre une futures d’entrée/sortie asynchrone et une future de timeout. Si le timeout se termine en premier, la future d’entrée/sortie asynchrone est jetée, ce qui déclenche l’annulation de l’opération sous-jacente.
Il est important de bien comprendre que ces combinateurs n’introduisent pas de
parallélisme dans l’exécution de code. Si plusieurs futures dans un join ou
un select sont prêtes, leurs méthodes poll()
seront appelées
séquentiellement, les unes après les autres, sur un seul coeur CPU. Nous allons
voir un peu plus loin comment faire quand ce n’est pas ce qu’on veut.
Ce que join et select permettent, en revanche, c’est d’introduire de la concurrence, donc d’attendre plusieurs événements simultanément. Par exemple en utilisant join, on peut lancer plusieurs entrées/sorties asynchrones au niveau du système d’exploitation et attendre que l’ensemble des résultats de ces entrées/sorties soient disponibles avant de continuer.
Exécuteurs et synchronisation
Introduction
On l’a vu précédemment, pour exécuter une tâche asynchrone, on appele à
plusieurs reprises la méthode poll()
de la future associée, jusqu’à ce
qu’elle retourne Poll::Ready
, en attendant entre chaque Poll::Pending
le
signal d’un Waker
.
Les autres abstractions de la crate futures
dont nous avons discuté
(Stream
, Sink
, AsyncRead
, etc.) utilisent un principe similaire, donc
dans la suite nous allons nous concentrer sur le cas des futures, mais la
plupart de ce qui va être dit sera applicable au reste.
Le composant d’un runtime asynchrone qui se charge d’exécuter des tâches
asynchrones est appelé un exécuteur, et la crate futures
fournit des
exécuteurs simples qui peuvent être utilisé quand on n’a pas besoin
d’un runtime asynchrone plus complet. Nous allons maintenant étudier les
stratégies d’exécution proposées par la crate futures
, par ordre de
complexité croissante.
Exécution bloquante
Avec futures::executor::block_on()
,
on exécute une future de façon bloquante. A chaque fois que la méthode
poll()
de la future retourne Poll::Pending
, le thread actif s’arrête en
attendant que la situation se débloque. Et lorsque poll()
finit par émettre le
résultat final via Poll::Ready
, l’exécution de block_on()
se termine en
retournant ce résultat à l’appelant.
A quoi cela peut-il bien servir de s’embêter à écrire du code asynchrone pour
finalement le rendre synchrone ? Typiquement lorsqu’on a besoin d’un résultat
tout de suite pour “nourrir” une API synchrone, ou bien à la fin de la fonction
main()
pour s’assurer que l’ensemble des tâches asynchrones qu’on a préparé
précédemment soit bien exécuté.
Ainsi, un programme asynchrone typique va souvent commencer par créer une grosse
future représentant l’ensemble des tâches qu’il a à traiter, puis effectuer un
appel block_on()
sur cette future pour exécuter toutes ces tâches avant de
s’arrêter.
Exécution séquentielle
L’utilisation efficace de block_on()
nous impose une gymnastique désagréable
avec les combinateurs join et select, pour nous assurer que l’ensemble des
futures que nous voulons exécuter soit bien couvert par l’attente que nous nous
préparons à effectuer.
Une approche plus élégante est d’utiliser
LocalPool
,
qui nous permet de programmer l’exécution d’un certain nombre de futures au sein
du thread actif via un objet de type
LocalSpawner
, obtenu via la méthode
pool.spawner()
et qui implémente les traits
Spawn
et
LocalSpawn
,
complétés par les extensions plus haut niveau
SpawnExt
et
LocalSpawnExt
.
On peut utiliser des méthodes de ces traits, comme
LocalSpawnExt::spawn_local()
,
pour programmer l’exécution de tâches asynchrones au sein de la LocalPool
.
Une fois qu’on a soumis des tâches via le mécanisme du LocalSpawner
,
LocalPool
permet de les exécuter de différentes façons :
- Avec
run()
, on peut exécuter complètement l’ensemble des futures soumises précédemment, comme si on avait appeléblock_on()
sur le join de ces futures. - Avec
run_until()
, on peut exécuter l’ensemble des futures soumises précédmment jusqu’à ce qu’une future nouvellement soumise se soit complètement exécutée, ce qui nécessiterait une combinaison complexe de join, select etblock_on()
si on tentait de le faire sansLocalPool
. - Avec
try_run_one()
etrun_until_stall()
, on peut exécuter l’ensemble des futures soumises précédemment jusqu’à ce qu’une tâche se termine (pourtry_run_one()
) ou que l’ensemble des tâches atteigne un point où il n’y a plus rien d’autre à faire qu’attendre.
Comme les combinateurs, LocalPool
n’introduit pas de parallélisme, juste de la
concurrence : les méthodes poll()
des futures qui sont prêtes à s’exécuter
sont appelées les unes après les autres, sur un seul coeur CPU. Tout ce que
fait LocalPool
, on pourrait le faire avec des combinateurs et block_on()
,
c’est juste (beaucoup) plus simple avec l’aide de LocalPool
.
Exécution parallèle
Nous l’avons dit plusieurs fois, quand on utilise block_on()
avec des
combinateurs ou LocalPool
on n’a que de la concurrence (attendre plusieurs
choses en même temps) et pas du parallélisme (exécuter plusieurs morceaux du
code de notre programme en même temps).
Pour exécuter nos tâches asynchrones de façon parallèle, on peut utiliser un
autre exécuteur fourni par la crate futures
, appelé
ThreadPool
.
Comme son nom l’indique, cet exécuteur gère un groupe de threads, par défaut
un par hyperthread CPU. On peut lui soumettre des tâches via la méthode
pool.spawn_ok()
,
et ces tâches s’exécuteront en parallèle sur les threads de l’exécuteur.
Si on compare ThreadPool
à LocalPool
…
ThreadPool
n’accepte que des futuresSend
pouvant être transmises à un autre thread pour exécution. C’est tout à fait logique, mais ça signifie qu’on ne peut plus utiliser des abstractions non thread-safe commeRc
, ce qui peut nécessiter quelques modifications du code.ThreadPool
ne fournit aucun moyen de se synchroniser avec la tâche en cours d’exécution, pas même de savoir quand est-ce qu’elle se termine. C’est à nous d’implémenter la synchronisation nécessaire.
Dans l’ensemble, la ThreadPool
de futures
est bâtie pour la simplicité
(son implémentation tient en quelques centaines de ligne de code), pas pour
l’ergonomie ni pour la performance. Cet exécuteur suffit pour des besoins
simples, mais toute application un tant soit peu complexe gagnera fortement à
utiliser l’exécuteur parallèle plus sophistiqué fourni par un runtime complet
comme tokio
, qui sera plus facile à utiliser, aura moins d’overhead, et
passera mieux à l’échelle sur des systèmes avec beaucoup de coeurs CPU.
Synchronisation
Dès lors qu’on a de l’exécution parallèle de tâches sur plusieurs threads, on
a besoin de synchronisation entre threads. On l’a vu, dans le cas de la
ThreadPool
simple de futures
ce besoin émerge ne serait-ce que pour attendre
que nos tâches qui s’exécutent en parallèle se terminent.
Dans un contexte de programmation asynchrone, c’est plus complexe que d’habitude de se synchroniser, car on doit éviter d’utiliser des primitives de synchronisation bloquantes basées sur le fait d’attendre au sein du thread actif qu’un autre thread fasse quelque chose. En effet, toute l’idée de la programmation asynchrone parallèle, c’est de travailler avec un nombre minimum de threads (un par coeur ou hyperthread CPU), donc si on commence à bloquer lesdits threads…
- Au mieux, on va arrêter d’utiliser certains de nos coeurs CPU, donc perdre en efficacité.
- Au pire, on va se retrouver dans une situation où tous nos threads attendent qu’une autre tâche asynchrone s’exécute pour continuer, mais plus aucune tâche ne peut s’exécuter car tous les threads sont bloqués, et donc l’exécution asynchrone sera définitivement bloquée.
On privilégiera donc l’utilisation de primitives de synchronisation adaptées à
la programmation asynchrone, où les opérations qui seraient normalement
bloquantes deviennent des opérations asynchrones. La crate futures
en
fournit quelques unes :
- Dans le module
channel
, on retrouve une variante asynchrone des files d’attentempsc
de la bibliothèque standard, ainsi qu’une variante spécialiséeoneshot
qui sert à envoyer une seule valeur, une seule fois, d’une tâches asynchrone à une autre.- La primitive
oneshot
est exactement ce dont on a besoin pour récupérer les résultats de tâches créées par des APIs de typeThreadPool::spawn_ok()
.
- La primitive
- Dans le module
lock
, on retrouve une variante asynchrone du typeMutex
de la bibliothèque standard, ainsi qu’un type plus spécialiséBiLock
optimisé pour le cas où il n’y a que deux tâches qui partagent une donnée.
Runtimes et interopérabilité
Nous avons maintenant terminé notre tour de la crate futures
, qui pose les
bases communes de l’asynchronisme en Rust sur lesquelles tout le monde s’accorde.
Mais pour faire quoi que ce soit d’utile avec de l’asynchronisme, il va nous falloir aussi des implémentations concrètes d’opérations asynchrones, par exemple…
- Des primitives pour faire des entrées/sorties asynchrones sur le disque, en réseau…
- Un système de minuteur permettant d’attendre des deadlines, de fixer des timeouts…
- Des interfaces asynchrones vers d’autres fonctionnalités systèmes importantes : création et attente de processus enfants, gestion des signaux Unix, logging dans la console…
De plus, nous avons aussi mentionné que le support de l’exécution parallèle
directement fourni par futures
est minimal, et qu’on gagnera souvent à le
remplacer par d’autres exécuteurs plus ergonomiques, plus économes en CPU, et
passant mieux à l’échelle.
Un runtime asynchrone est une bibliothèque qui répond à tous ces besoins. On
l’a évoqué précédemment, il en existe plusieurs, plus ou moins spécialisés dans
un domaine d’application. Dans la suite de ce cours, nous allons faire appel
au runtime généraliste le plus utilisé,
tokio
.
Dans un monde idéal, le runtime que vous utilisez n’aurait pas d’importance, et vous pourriez librement envoyer les futures d’un runtime à l’exécuteur d’un autre. C’est l’objectif de long terme, mais malheureusement à l’heure où ces lignes sont écrites nous n’en sommes pas là. Il faut donc éviter de mélanger plusieurs runtimes dans votre code sous peine de rencontrer des problèmes bizarres (ex : tâches d’entrée/sortie qui se bloquent indéfiniment car la routine d’attente groupée de leur runtime n’est pas appelée) ou des problèmes d’oversubscription (ex : les exécuteurs parallèles de deux runtimes se battent pour le temps CPU d’un même coeur CPU).
Avant de vous lancer dans un projet utilisant l’asynchronisme, je vous
recommande donc de commencer par étudier les bibliothèques asynchrones utiles à
votre projet, les runtimes que chacune supporte, en choisir un qui met tout
le monde d’accord, et essayer de vous y tenir. En cas de doute, privilégiez les
runtimes les plus populaires comme tokio
: c’est ceux que vos bibliothèques
métier auront le plus de chances de supporter.