Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aplicando Concorrência com Async

Nesta seção, aplicaremos async a alguns dos mesmos desafios de concorrência que enfrentamos com threads no Capítulo 16. Como já falamos sobre muitas das ideias principais lá, nesta seção vamos nos concentrar no que há de diferente entre threads e futures.

Em muitos casos, as APIs para trabalhar com concorrência usando async são muito parecidas com as APIs para usar threads. Em outros casos, elas acabam sendo bem diferentes. Mesmo quando as APIs parecem semelhantes entre threads e async, elas frequentemente têm comportamento diferente, e quase sempre têm características de desempenho diferentes.

Criando uma Nova Tarefa com spawn_task

A primeira operação que enfrentamos na seção “Criando uma Nova Thread com spawn do Capítulo 16 foi contar em duas threads separadas. Vamos fazer o mesmo usando async. O crate trpl fornece uma função spawn_task que se parece muito com a API thread::spawn, e uma função sleep que é uma versão async da API thread::sleep. Podemos usá-las juntas para implementar o exemplo de contagem, como mostrado na Listagem 17-6.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: Criando uma nova tarefa para imprimir uma coisa enquanto a tarefa principal imprime outra

Como ponto de partida, configuramos nossa função main com trpl::block_on para que nossa função de nível superior possa ser async.

Nota: Deste ponto em diante no capítulo, todos os exemplos incluirão exatamente este mesmo código de envolvimento com trpl::block_on em main, então muitas vezes o omitiremos, assim como fazemos com main. Lembre-se de incluí-lo no seu código!

Então escrevemos dois loops dentro desse bloco, cada um contendo uma chamada a trpl::sleep, que espera meio segundo (500 milissegundos) antes de enviar a próxima mensagem. Colocamos um loop no corpo de trpl::spawn_task e o outro em um loop for de nível superior. Também adicionamos um await depois das chamadas a sleep.

Este código se comporta de forma parecida com a implementação baseada em threads, incluindo o fato de que você pode ver as mensagens aparecerem em uma ordem diferente no seu próprio terminal ao executá-lo:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Esta versão para assim que o loop for no corpo do bloco async principal termina, porque a tarefa gerada por spawn_task é encerrada quando a função main termina. Se você quiser que ela rode até a conclusão da tarefa, precisará usar um join handle para aguardar a primeira tarefa terminar. Com threads, usamos o método join para “bloquear” até que a thread terminasse de rodar. Na Listagem 17-7, podemos usar await para fazer a mesma coisa, porque o handle da tarefa é ele próprio um future. Seu tipo Output é um Result, então também chamamos unwrap nele depois de aguardá-lo.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: Usando await com um join handle para executar uma tarefa até a conclusão

Esta versão atualizada roda até que ambos os loops terminem:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Até aqui, parece que async e threads nos dão resultados semelhantes, apenas com sintaxe diferente: usamos await em vez de chamar join no join handle, e aguardamos as chamadas a sleep.

A diferença maior é que não precisamos gerar outra thread do sistema operacional para fazer isso. Na verdade, nem precisamos gerar uma tarefa aqui. Como blocos async são compilados em futures anônimos, podemos colocar cada loop em um bloco async e fazer o runtime executar ambos até o fim usando a função trpl::join.

Na seção “Esperando Todas as Threads Terminarem” do Capítulo 16, mostramos como usar o método join no tipo JoinHandle retornado quando você chama std::thread::spawn. A função trpl::join é semelhante, mas para futures. Quando você entrega dois futures a ela, ela produz um único novo future cuja saída é uma tupla contendo a saída de cada future que você passou, depois que ambos terminarem. Assim, na Listagem 17-8, usamos trpl::join para esperar que fut1 e fut2 terminem. Nós não aguardamos fut1 e fut2, mas sim o novo future produzido por trpl::join. Ignoramos a saída, porque ela é apenas uma tupla contendo dois valores unit.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: Usando trpl::join para aguardar dois futures anônimos

Quando executamos isso, vemos ambos os futures rodarem até o fim:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Agora você verá exatamente a mesma ordem todas as vezes, o que é bem diferente do que vimos com threads e com trpl::spawn_task na Listagem 17-7. Isso acontece porque a função trpl::join é justa, o que significa que ela verifica cada future com a mesma frequência, alternando entre eles, e nunca deixa um disparar à frente se o outro estiver pronto. Com threads, o sistema operacional decide qual thread verificar e por quanto tempo deixá-la rodar. Com Rust async, o runtime decide qual tarefa verificar. (Na prática, os detalhes ficam complicados porque um runtime async pode usar threads do sistema operacional por baixo dos panos como parte de como gerencia concorrência, então garantir justiça pode dar mais trabalho para um runtime, mas ainda é possível!) Runtimes não precisam garantir justiça para qualquer operação específica, e frequentemente oferecem APIs diferentes para permitir que você escolha se quer ou não justiça.

Experimente algumas destas variações ao aguardar os futures e veja o que elas fazem:

  • Remova o bloco async ao redor de um ou de ambos os loops.
  • Aguarde cada bloco async imediatamente depois de defini-lo.
  • Envolva apenas o primeiro loop em um bloco async e aguarde o future resultante depois do corpo do segundo loop.

Como desafio extra, veja se consegue descobrir qual será a saída em cada caso antes de executar o código!

Enviando Dados Entre Duas Tarefas Usando Passagem de Mensagens

Compartilhar dados entre futures também será familiar: usaremos passagem de mensagens novamente, mas desta vez com versões async dos tipos e funções. Vamos seguir um caminho um pouco diferente daquele que seguimos na seção “Transferindo Dados Entre Threads com Passagem de Mensagens” do Capítulo 16 para ilustrar algumas das principais diferenças entre concorrência baseada em threads e concorrência baseada em futures. Na Listagem 17-9, começaremos com apenas um bloco async, sem gerar uma tarefa separada como geramos uma thread separada antes.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}
Listing 17-9: Criando um canal async e atribuindo suas duas metades a tx e rx

Aqui, usamos trpl::channel, uma versão async da API de canal com múltiplos produtores e consumidor único que usamos com threads no Capítulo 16. A versão async da API é só um pouco diferente da versão baseada em threads: ela usa um receptor mutável, em vez de um receptor imutável, rx, e seu método recv produz um future que precisamos aguardar, em vez de produzir o valor diretamente. Agora podemos enviar mensagens do sender para o receiver. Observe que não precisamos gerar uma thread separada nem mesmo uma tarefa; precisamos apenas aguardar a chamada a rx.recv.

O método síncrono Receiver::recv em std::mpsc::channel bloqueia até receber uma mensagem. O método trpl::Receiver::recv não bloqueia, porque é async. Em vez de bloquear, ele devolve o controle ao runtime até que uma mensagem seja recebida ou o lado de envio do canal seja fechado. Em contraste, não aguardamos a chamada a send, porque ela não bloqueia. Ela não precisa bloquear, porque o canal para o qual estamos enviando é ilimitado.

Nota: Como todo esse código async roda em um bloco async dentro de uma chamada a trpl::block_on, tudo dentro dele pode evitar bloqueio. No entanto, o código fora dele bloqueará enquanto espera a função block_on retornar. Esse é o objetivo da função trpl::block_on: ela permite que você escolha onde bloquear em algum conjunto de código async e, portanto, onde fazer a transição entre código sync e async.

Observe duas coisas sobre este exemplo. Primeiro, a mensagem chegará imediatamente. Segundo, embora usemos um future aqui, ainda não há concorrência. Tudo na listagem acontece em sequência, exatamente como aconteceria se não houvesse futures envolvidos.

Vamos abordar a primeira parte enviando uma série de mensagens e dormindo entre elas, como mostrado na Listagem 17-10.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: Enviando e recebendo múltiplas mensagens pelo canal async e dormindo com um await entre cada mensagem

Além de enviar as mensagens, precisamos recebê-las. Neste caso, como sabemos quantas mensagens virão, poderíamos fazer isso manualmente chamando rx.recv().await quatro vezes. No mundo real, porém, geralmente estaremos esperando por algum número desconhecido de mensagens, então precisamos continuar esperando até determinar que não há mais mensagens.

Na Listagem 16-10, usamos um loop for para processar todos os itens recebidos de um canal síncrono. Rust ainda não tem uma forma de usar um loop for com uma série de itens produzida assincronamente, porém, então precisamos usar um loop que ainda não vimos: o loop condicional while let. Esta é a versão em loop da construção if let que vimos na seção “Fluxo de Controle Conciso com if let e let...else do Capítulo 6. O loop continuará executando enquanto o padrão especificado continuar correspondendo ao valor.

A chamada rx.recv produz um future, que aguardamos. O runtime pausará o future até que ele esteja pronto. Assim que uma mensagem chega, o future se resolve para Some(message) tantas vezes quantas mensagens chegarem. Quando o canal fecha, independentemente de qualquer mensagem ter chegado ou não, o future se resolve para None para indicar que não há mais valores e, portanto, devemos parar de fazer polling, isto é, parar de aguardar.

O loop while let junta tudo isso. Se o resultado de chamar rx.recv().await for Some(message), obtemos acesso à mensagem e podemos usá-la no corpo do loop, assim como poderíamos fazer com if let. Se o resultado for None, o loop termina. Toda vez que o loop completa, ele atinge o ponto de await de novo, então o runtime o pausa novamente até que outra mensagem chegue.

O código agora envia e recebe todas as mensagens com sucesso. Infelizmente, ainda há alguns problemas. Primeiro, as mensagens não chegam em intervalos de meio segundo. Elas chegam todas de uma vez, 2 segundos (2.000 milissegundos) depois de iniciarmos o programa. Segundo, este programa também nunca termina! Em vez disso, ele espera para sempre por novas mensagens. Você precisará encerrá-lo usando ctrl-C.

Código Dentro de Um Bloco Async Executa Linearmente

Vamos começar examinando por que as mensagens chegam todas de uma vez depois do atraso completo, em vez de chegarem com atrasos entre elas. Dentro de um determinado bloco async, a ordem em que as palavras-chave await aparecem no código também é a ordem em que elas são executadas quando o programa roda.

Há apenas um bloco async na Listagem 17-10, então tudo nele roda linearmente. Ainda não há concorrência. Todas as chamadas a tx.send acontecem, intercaladas com todas as chamadas a trpl::sleep e seus pontos de await associados. Só então o loop while let chega a passar por algum dos pontos de await nas chamadas a recv.

Para obter o comportamento que queremos, em que o atraso de sleep acontece entre cada mensagem, precisamos colocar as operações tx e rx em seus próprios blocos async, como mostrado na Listagem 17-11. Então o runtime pode executar cada uma separadamente usando trpl::join, assim como na Listagem 17-8. Mais uma vez, aguardamos o resultado de chamar trpl::join, não os futures individuais. Se aguardássemos os futures individuais em sequência, acabaríamos voltando a um fluxo sequencial, exatamente o que estamos tentando não fazer.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: Separando send e recv em seus próprios blocos async e aguardando os futures desses blocos

Com o código atualizado da Listagem 17-11, as mensagens são impressas em intervalos de 500 milissegundos, em vez de todas de uma vez depois de 2 segundos.

Movendo Ownership Para Dentro de Um Bloco Async

O programa ainda nunca termina, porém, por causa da forma como o loop while let interage com trpl::join:

  • O future retornado por trpl::join só completa quando ambos os futures passados a ele completarem.
  • O future tx_fut completa depois que termina de dormir após enviar a última mensagem em vals.
  • O future rx_fut não completará até que o loop while let termine.
  • O loop while let não terminará até que aguardar rx.recv produza None.
  • Aguardar rx.recv retornará None somente depois que a outra extremidade do canal for fechada.
  • O canal fechará somente se chamarmos rx.close ou quando o lado de envio, tx, for descartado.
  • Não chamamos rx.close em lugar nenhum, e tx não será descartado até que o bloco async mais externo passado a trpl::block_on termine.
  • O bloco não consegue terminar porque está bloqueado em trpl::join completando, o que nos leva de volta ao topo desta lista.

Neste momento, o bloco async em que enviamos as mensagens apenas pega emprestado tx, porque enviar uma mensagem não exige ownership. Mas, se pudéssemos mover tx para dentro desse bloco async, ele seria descartado assim que o bloco terminasse. Na seção “Capturando Referências ou Movendo Ownership” do Capítulo 13, você aprendeu a usar a palavra-chave move com closures e, como discutido na seção “Usando Closures move com Threads” do Capítulo 16, muitas vezes precisamos mover dados para dentro de closures ao trabalhar com threads. A mesma dinâmica básica se aplica a blocos async, então a palavra-chave move funciona com blocos async assim como funciona com closures.

Na Listagem 17-12, alteramos o bloco usado para enviar mensagens de async para async move.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: Uma revisão do código da Listagem 17-11 que encerra corretamente ao terminar

Quando executamos esta versão do código, ela encerra de forma adequada depois que a última mensagem é enviada e recebida. A seguir, vamos ver o que precisaria mudar para enviar dados a partir de mais de um future.

Juntando Vários Futures com a Macro join!

Esse canal async também é um canal de múltiplos produtores, então podemos chamar clone em tx se quisermos enviar mensagens a partir de múltiplos futures, como mostrado na Listagem 17-13.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
Listing 17-13: Usando múltiplos produtores com blocos async

Primeiro, clonamos tx, criando tx1 fora do primeiro bloco async. Movemos tx1 para dentro desse bloco exatamente como fizemos antes com tx. Então, mais tarde, movemos o tx original para dentro de um novo bloco async, onde enviamos mais mensagens com um atraso um pouco mais lento. Acontece que colocamos esse novo bloco async depois do bloco async que recebe mensagens, mas ele poderia vir antes também. O que importa é a ordem em que os futures são aguardados, não a ordem em que são criados.

Ambos os blocos async para enviar mensagens precisam ser blocos async move para que tanto tx quanto tx1 sejam descartados quando esses blocos terminarem. Caso contrário, voltaríamos ao mesmo loop infinito em que começamos.

Por fim, trocamos trpl::join por trpl::join! para lidar com o future adicional: a macro join! aguarda um número arbitrário de futures quando sabemos o número de futures em tempo de compilação. Discutiremos como aguardar uma coleção com um número desconhecido de futures mais adiante neste capítulo.

Agora vemos todas as mensagens dos dois futures de envio e, como os futures de envio usam atrasos ligeiramente diferentes depois de enviar, as mensagens também são recebidas nesses intervalos diferentes:

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Exploramos como usar passagem de mensagens para enviar dados entre futures, como o código dentro de um bloco async roda sequencialmente, como mover ownership para dentro de um bloco async e como juntar múltiplos futures. A seguir, vamos discutir como e por que informar ao runtime que ele pode alternar para outra tarefa.