Threads

Dans les chapitres sur les pointeurs et la mutabilité interne, nous avons beaucoup parlé de threads. Il est maintenant temps d’aborder comment ceux-ci sont gérés en Rust.

Parallélisme structuré

La programmation parallèle est difficile car on doit concevoir des programmes qui sont corrects quel que soit l’ordre dans lequel les différentes opérations sont effectuées par les différents threads.

Mais il y a des degrés dans la difficulté :

  • A un extrême, on a le parallélisme de données, dont vous verrez dans la section “parallélisation” qu’il peut être complètement géré par une bibliothèque sans aucun effort de votre part. Je vous recommande d’utiliser cette option chaque fois que c’est possible.
  • A l’autre extrême, il y a des programmes qui lancent des threads qui tournent en tâche de fond pendant toutes la durée d’exécution de l’application, complètement indépendants du thread principal à quelques interactions peu visibles dans le code près.
  • Entre ces deux extrêmes, il y a une manière de structurer les programmes parallèles qui est relativement facile à comprendre, tout en permettant de bonnes performances sur de nombreux problèmes. C’est de définir au sein du programme des régions parallèles, avec un début et une fin bien déterminée, et un objectif relativement clair.
    • Au début de la région parallèle, des threads sont lancés.
    • Au sein de la région parallèle, les threads s’exécutent ensemble pour atteindre l’objectif donné, avec une synchronisation aussi simple et rare que possible.
    • A la fin, on attend que tous les threads aient terminé, on gère les erreurs éventuelles, et on reprend l’exécution en séquentiel jusqu’à la région parallèle suivante.

Cette dernière approche est appelée “parallélisme structuré”, et elle est appropriée quand les régions parallèles ont une charge de travail suffisante pour que les coûts de création/arrêt/synchronisation de threads soient bien amortis et que le poids relatif des régions séquentielles du code soit négligeable par rapport à celui des régions parallèles. Je vous recommande de la privilégier par rapport au parallélisme non structuré, lorsque le parallélisme de données n’est pas applicable.

En Rust, le parallélisme structuré est supporté via la construction std::thread::scope(). En voici un exemple relativement simple que vous pouvez vous amuser à modifier comme vous voulez :

use std::sync::mpsc;

fn main() {
    // Les variables définies hors de la région parallèle sont accessibles par
    // référence aux threads de traitement : on a la garantie que les threads
    // auront terminé avant que cet état soit libéré.
    let base = 42;

    // Début de la région parallèle
    std::thread::scope(move |scope| {
        // Chaque thread a une file pour recevoir du travail à faire
        let num_threads = 5;
        let inputs = 
            std::iter::repeat_with(|| mpsc::channel())
                .take(num_threads)
                .collect::<Vec<_>>();

        // Création de quelques threads, avec extraction des interfaces
        // d'entrée des files pour leur soumettre du travail
        let inputs =
            inputs.into_iter()
                .enumerate()
                .map(|(thread_idx, (input_send, input_recv))| {
                    // Création d'un thread associé à la région parallèle
                    scope.spawn(move || {
                        // Traitement des demandes jusqu'à ce que le thread
                        // principal ait terminé
                        for entree in input_recv {
                            // Traitement d'une demande utilisant l'état partagé
                            println!("Thread {thread_idx} : Reçu {entree}");
                            let resultat = entree + base;
                            println!("Thread {thread_idx} : Emis {resultat}");
                        }
                    });

                    // On expose l'interface d'entrée au thread principal
                    input_send
                })
                .collect::<Vec<_>>();

        // Soumission d'un peu de travail aux threads
        for i in 0..30 {
            let thread_idx = i % num_threads;
            let input = &inputs[thread_idx];
            input.send(i).expect("Un thread de travail est mort");
        }

        // Ici, les destructeurs de "inputs" sont appelés, ce qui signale aux
        // threads de travail que le thread principal à terminé.
        // L'implémentation de std::thread::scope attend ensuite que les threads
        // de travail aient terminé, en propageant les paniques éventuelles.
    });
}

Vous noterez que l’exécution de ce programme d’exemple n’est pas très parallèle. Cela tient au fait que le calcul est très simple et l’accès à la sortie texte de println!() est soumis à synchronisation.

Parallélisme non structuré

Parfois, le parallélisme structuré ne convient pas et on est forcé de créer des threads secondaires vraiment indépendants du thread principal. On parlera alors de parallélisme non structuré.

Dans ce cas, ça devient votre responsabilité d’assurer les bonnes propriétés que le parallélisme structuré garantissait pour vous :

  • L’état partagé ne doit pas être libéré avant que l’ensemble des threads n’aient fini de l’utiliser (on utilise généralement Arc pour ça, au prix de nombreuses allocations mémoire).
  • Lorsqu’une erreur survient au sein d’un thread, les autres threads qui travaillent avec ce thread doivent en être informés et le gérer (si vous utilisez mpsc, c’est en partie fait pour vous, avec les autres formes de synchronisation vous devez l’implémenter vous-même).
  • Le thread principal ne doit pas s’arrêter avant que l’ensemble des threads secondaires n’aient terminé leur travail (en termes pthread, ce sont des threads détachés).

Le langage vous fournit quelques aides à la synchronisation même dans ce cas :

  • On l’a vu ci-dessus, l’utilisation de Arc et mpsc permet de récupérer une partie des bonnes propriétés du parallélisme structuré.
  • La primitive de synchronisation Barrier vous permet d’attendre qu’un groupe de threads ait terminé un travail avant que l’ensemble de ces threads ne soient autorisés à continuer.
  • La création d’un thread vous retourne un JoinHandle, que vous pouvez utiliser pour attendre que le thread ait fini de s’exécuter et propager les paniques éventuelles.
  • Et puis il y a les autres outils mentionnés dans le chapitre précédent : Mutex, Condvar, …

Pour créer des threads de façon non structurée, vous pouvez utiliser std::thread::spawn(). Voici une variante de l’exemple précédent qui utilise du parallélisme non structuré :

use std::sync::{mpsc, Arc};

fn main() {
    // L'état partagé doit être géré d'une façon qui ne permet pas sa libération
    // précoce avant que les threads n'aient fini de l'utiliser. Le compilateur
    // ne peut pas le prouver dans le cas du parallélisme non structuré, donc
    // on doit utiliser Arc.
    let base = Arc::new(42usize);

    // Création des files de travail entrant, comme avant
    let num_threads = 5;
    let inputs = 
        std::iter::repeat_with(|| mpsc::channel())
            .take(num_threads)
            .collect::<Vec<_>>();

    // Création des threads. Cette fois, on doit récupérer le JoinHandle, au
    // lieu de laisser la région parallèle le gérer comme avant.
    let inputs_and_handles =
        inputs.into_iter()
            .enumerate()
            .map(|(thread_idx, (input_send, input_recv))| {
                // Création d'une copie du Arc associé à l'état partagé, 
                // qui sera spécifique à ce thread :
                let base = base.clone();

                // Création d'un thread associé à la région parallèle
                let join_handle = std::thread::spawn(move || {
                    // Traitement identique au code précédent, sauf qu'on doit
                    // penser à déréférencer le Arc.
                    for entree in input_recv {
                        println!("Thread {thread_idx} : Reçu {entree}");
                        let resultat = entree + *base;
                        println!("Thread {thread_idx} : Emis {resultat}");
                    }
                });

                // On expose l'interface d'entrée au thread principal comme
                // avant, mais on y ajoute le JoinHandle pour attendre le thread
                (input_send, join_handle)
            })
            .collect::<Vec<_>>();

    // Soumission de travail presque identique au code précédent, à part que
    // maintenant on a aussi des JoinHandles dans la liste des files entrantes
    // et ça nécessite un peu de pattern matching.
    for i in 0..30 {
        let thread_idx = i % num_threads;
        let (input, _handle) = &inputs_and_handles[thread_idx];
        input.send(i).expect("Un thread de travail est mort");
    }

    // Et enfin, on attend les threads de travail
    for (input, handle) in inputs_and_handles {
        // Pour les prévenir qu'on a fini, on doit déclencher précocément le
        // destructeur de l'interface d'entrée de la file d'attente...
        std::mem::drop(input);

        // ...après quoi on peut attendre les thread en toute sécurité
        handle.join().expect("Un thread de travail est mort");
    }
}

J’espère que cet exemple simple suffira à vous convaincre que le parallélisme structuré est d’une ergonomie très supérieure à celle du parallélisme non structuré, et devrait donc être utilisé chaque fois qu’il est applicable au problème qu’on veut traiter.