Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

De um servidor single-thread para um servidor multithread

Neste momento, o servidor processará cada solicitação por vez, o que significa que não processar uma segunda conexão até que o processamento da primeira conexão seja concluído. Se o servidor recebesse cada vez mais solicitações, esta execução serial seria cada vez menos ideal. Se o servidor receber uma solicitação que demora muito para processar, as solicitações subsequentes terão que esperar até que a solicitação longa seja concluído, mesmo que as novas solicitações possam ser processadas rapidamente. Precisaremos consertar isso, mas primeiro veremos o problema em ação.

Simulando uma requisição lenta

Veremos como uma solicitação de processamento lento pode afetar outras solicitações feitas para nossa implementação de servidor atual. A Listagem 21-10 implementa o tratamento de uma solicitação para /sleep com uma resposta lenta simulada que fará o servidor dormir por cinco segundos antes de responder.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: Simulando uma requisição lenta ao dormir por cinco segundos

Mudamos de if para match agora que temos três casos. Precisamos fazer match explicitamente em um slice de request_line para comparar padrões com valores literais de string; match não faz referência automática e desreferenciação, como faz o método da igualdade.

O primeiro braço é igual ao bloco if da Listagem 21-9. O segundo braço corresponde a uma solicitação para /sleep. Quando essa solicitação for recebida, o servidor irá durma por cinco segundos antes de renderizar a página HTML bem-sucedida. O terceiro braço é igual ao bloco else da Listagem 21-9.

Você pode ver o quão primitivo é o nosso servidor: bibliotecas reais lidariam com o reconhecimento de múltiplas solicitações de uma forma muito menos detalhada!

Inicie o servidor usando cargo run. Em seguida, abra duas janelas do navegador: uma para http://127.0.0.1:7878 e outro para http://127.0.0.1:7878/sleep. Se você insira o URI / algumas vezes, como antes, você verá que ele responde rapidamente. Mas se você digita /sleep e então carrega /, você verá que / espera até sleep dormiu cinco segundos inteiros antes de carregar.

Existem várias técnicas que podemos usar para evitar que uma requisição lenta faça outras requisições se acumularem, incluindo o uso de async, como fizemos no Capítulo 17; a que implementaremos aqui é um thread pool.

Melhorando o rendimento com um pool de threads

Um thread pool é um grupo de threads geradas que estão prontas e aguardando para lidar com uma tarefa. Quando o programa recebe uma nova tarefa, ele atribui uma das threads no pool para a tarefa e que thread processará a tarefa. O threads restantes no pool estão disponíveis para lidar com quaisquer outras tarefas que venham enquanto o primeiro thread está sendo processado. Quando o primeiro thread estiver pronto processando sua tarefa, ele retorna ao pool de threads ocioso, pronto para lidar uma nova tarefa. Um thread pool permite processar conexões simultaneamente, aumentando o rendimento do seu servidor.

Limitaremos o número de threads no pool a um pequeno número para nos proteger de ataques DoS; se nosso programa criasse um novo thread para cada solicitação como quando chegou, alguém fazendo 10 milhões de solicitações ao nosso servidor poderia causar estragos usando todos os recursos do nosso servidor e processando solicitações parar.

Em vez de gerar threads ilimitadas, teremos um número fixo de threads esperando no pool. As solicitações recebidas são enviadas ao pool para processamento. O pool manterá uma fila de solicitações recebidas. Cada um dos threads no pool irá gerar uma solicitação desta fila, tratar a solicitação, e então solicite outra solicitação à fila. Com este design, podemos processar para solicitações _ N _ simultaneamente, onde _ N _ é o número de threads. Se cada thread está respondendo a uma solicitação de longa duração, as solicitações subsequentes ainda podem fazer backup na fila, mas aumentamos o número de solicitações de longa duração podemos lidar antes de chegar a esse ponto.

Esta técnica é apenas uma das muitas maneiras de melhorar o rendimento de uma web servidor. Outras opções que você pode explorar são o modelo fork/join, o modelo de E/S async de thread único e o modelo de E/S async multithread. Se você está interessado neste tópico, você pode ler mais sobre outras soluções e tente implementá-los; com uma linguagem de baixo nível como Rust, todos esses opções são possíveis.

Antes de começarmos a implementar um thread pool, vamos falar sobre como seu uso deve se parecer. Quando você está tentando projetar código, escrever primeiro a interface do cliente pode ajudar a orientar o design. Escreva a API do código de modo que ela fique estruturada da maneira como você deseja chamá-la; então, implemente a funcionalidade dentro dessa estrutura, em vez de implementar a funcionalidade e só depois projetar a API pública.

Semelhante à forma como usamos o desenvolvimento orientado a testes no projeto no Capítulo 12, usaremos desenvolvimento orientado a compilador aqui. Escreveremos o código que chama o funções que queremos e, em seguida, examinaremos os erros do compilador para determinar o que devemos mudar a seguir para fazer o código funcionar. Antes de fazermos isso, no entanto, exploraremos a técnica que não usaremos como ponto de partida.

Gerando uma thread para cada solicitação

Primeiro, vamos explorar como nosso código ficaria se ele criasse um novo thread para cada conexão. Como mencionado anteriormente, este não é o nosso plano final devido ao problemas com a geração potencial de um número ilimitado de threads, mas é um ponto de partida para obter primeiro um servidor multithread funcional. Então, adicionaremos o thread pool como uma melhoria, e contrastar as duas soluções será mais fácil.

A Listagem 21-11 mostra as alterações a serem feitas em main para gerar um novo thread para lidar com cada stream dentro do loop for.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: Criando uma nova thread para cada stream

Como você aprendeu no Capítulo 16, thread::spawn criará um novo thread e então execute o código no closure no novo thread. Se você executar este código e carregar /sleep no seu navegador, então / em mais duas guias do navegador, você realmente verá que as solicitações para / não precisam esperar que /sleep termine. No entanto, como mencionamos, isso acabará sobrecarregando o sistema porque você estaria fazendo novo threads sem qualquer limite.

Você também deve se lembrar do Capítulo 17 que este é exatamente o tipo de situação onde async e await realmente brilham! Tenha isso em mente enquanto construímos o thread pool e pense em como as coisas seriam diferentes ou iguais com async.

Criando um número finito de threads

Queremos que nosso thread pool funcione de maneira semelhante e familiar, para que trocar threads por um thread pool não exija grandes alterações no código que usa nossa API. A Listagem 21-12 mostra a interface hipotética da struct ThreadPool que queremos usar no lugar de thread::spawn.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: Nossa interface ideal para ThreadPool

Usamos ThreadPool::new para criar um novo thread pool com um número configurável de threads, neste caso quatro. Então, no loop for, pool.executetem um interface semelhante ao thread::spawn, pois é necessário um closure que o pool deve ser executado para cada stream. Precisamos implementar pool.executepara que pega o closure e o entrega a um thread no pool para execução. Este código não ainda compilar, mas tentaremos para que o compilador possa nos orientar sobre como corrigi-lo.

Construindo ThreadPool usando desenvolvimento orientado a compilador

Faça as alterações na Listagem 21-12 em src/main.rs e então vamos usar o erros do compilador do cargo check para impulsionar nosso desenvolvimento. Aqui está o primeiro erro que obtemos:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Ótimo! Esse erro nos diz que precisamos de um tipo ou módulo ThreadPool, então vamos construí-lo agora. Nossa implementação de ThreadPool será independente do tipo de trabalho que nosso web server está realizando. Então, vamos transformar o crate hello de um crate binário em um crate de biblioteca para armazenar nossa implementação de ThreadPool. Depois de mudarmos para um crate de biblioteca, também poderíamos usar essa biblioteca de thread pool separadamente para qualquer trabalho que quiséssemos fazer, não apenas para atender requisições web.

Crie um arquivo src/lib.rs que contenha o seguinte, que é o mais simples definição de uma estrutura ThreadPool que podemos ter por enquanto:

Filename: src/lib.rs
pub struct ThreadPool;

Em seguida, edite o arquivo main.rs para trazer ThreadPool para o escopo da biblioteca crate adicionando o seguinte código ao topo de src/main.rs:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Este código ainda não funcionará, mas vamos verificá-lo novamente para obter o próximo erro que precisamos abordar:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Este erro indica que a seguir precisamos criar uma função associada chamada new paraThreadPool . Também sabemos quenewprecisa ter um parâmetro que pode aceitar4como argumento e deve retornar uma instânciaThreadPool. Vamos implementar a função newmais simples que terá esses características:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Escolhemos usize como o tipo do parâmetro size porque sabemos que um número negativo de threads não faz sentido. Também sabemos que usaremos isso 4 como o número de elementos em uma coleção de threads, que é o que o O tipousize é para, conforme discutido na seção “Tipos inteiros” no Capítulo 3.

Vamos verificar o código novamente:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Agora o erro ocorre porque não temos um método execute em ThreadPool. Lembre-se do artigo “Criando um Número Finito de Threads” seção que decidimos que nosso thread pool deveria ter uma interface semelhante à de thread::spawn. Além disso, implementaremos a função execute para que ela receba a closure fornecida e a entregue a uma thread ociosa no pool para ser executada.

Definiremos o método execute em ThreadPool para tomar um closure como parâmetro. Lembre-se do artigo “Movendo valores capturados para fora Fechamentos” no Capítulo 13 que podemos tome closures como parâmetros com três traits diferentes: Fn, FnMute FnOnce. Precisamos decidir que tipo de closure usar aqui. Nós sabemos que vamos acabar fazendo algo semelhante à biblioteca padrãothread::spawn implementação, para que possamos ver o que limita a assinatura de thread::spawn tem em seu parâmetro. A documentação nos mostra o seguinte:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

O parâmetro de tipo F é o que nos preocupa aqui; o tipo T parâmetro está relacionado ao valor de retorno e não estamos preocupados com isso. Nós podemos ver que spawnusa FnOncecomo trait vinculado a F. Isto é provavelmente o que queremos também, porque eventualmente passaremos no argumento que entramos executeparaspawn. Podemos ter ainda mais certeza de queFnOnceé o trait que deseja usar porque o thread para executar uma solicitação executará apenas aquela closure da solicitação uma vez, que corresponde aoOnceem FnOnce.

O parâmetro de tipo F também possui o trait vinculado ao Send e o lifetime vinculado 'static , que são úteis em nossa situação: Precisamos deSendpara transferir o closure de um thread para outro e'staticporque não sabemos quanto tempo o thread levará para ser executado. Vamos criar um métodoexecuteem ThreadPoolque receberá um parâmetro genérico do tipo Fcom estes limites:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Ainda usamos () depois de FnOnce porque este FnOnce representa um closure que não aceita parâmetros e retorna o tipo de unidade (). Assim como a função definições, o tipo de retorno pode ser omitido da assinatura, mas mesmo se não temos parâmetros, ainda precisamos dos parênteses.

Novamente, esta é a implementação mais simples do método execute: ela faz nada, mas estamos apenas tentando compilar nosso código. Vamos verificar novamente:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

Ele compila! Mas observe que se você tentar cargo run e fizer uma solicitação no navegador, você verá os erros no navegador que vimos no início de o capítulo. Nossa biblioteca não está realmente chamando o closure passado para execute ainda!

Nota: Um ditado que você pode ouvir sobre linguagens com compiladores estritos, como Haskell e Rust, é “Se o código compilar, ele funciona”. Mas este ditado não é universalmente verdadeiro. Nosso projeto compila, mas não faz absolutamente nada! Se nós estamos construindo um projeto real e completo, este seria um bom momento para começar escrever testes unitários para verificar se o código compila e tem o comportamento que quero.

Pense nisto: o que seria diferente aqui se fôssemos executar um future em vez de uma closure?

Validando o número de threads em new

Não estamos fazendo nada com os parâmetros new e execute. Vamos implementar o corpo dessas funções com o comportamento que desejamos. Para começar, pensemos em new. Antes escolhemos um tipo não assinado para o parâmetro size porque um pool com um número negativo de threads não faz sentido. No entanto, um pool com zero threads também não faz sentido, e zero é um valor perfeitamente válido para usize. Vamos adicionar código para verificar se size é maior que zero antes de retornar uma instância de ThreadPool, e faremos o programa entrar em panic se receber zero, usando a macro assert!, como mostra a Listagem 21-13.

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: Implementando ThreadPool::new para gerar panic se size for zero

Também adicionamos alguma documentação para nosso ThreadPool com comentários de documentos. Observe que seguimos boas práticas de documentação adicionando uma seção que chama as situações em que nossa função pode panic, conforme discutido em Capítulo 14. Tente executar cargo doc --open e clicar na estrutura ThreadPool para ver como são os documentos gerados para new!

Em vez de adicionar a macro assert! como fizemos aqui, poderíamos transformar new em build e retornar um Result, como fizemos com Config::build no projeto de I/O da Listagem 12-9. Mas decidimos que, neste caso, tentar criar um thread pool sem nenhuma thread deve ser um erro irrecuperável. Se você estiver se sentindo ambicioso, tente escrever uma função chamada build com a seguinte assinatura para compará-la com a função new:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

Criando espaço para armazenar os threads

Agora que temos uma forma de garantir que há um número válido de threads para armazenar no pool, podemos criá-las e guardá-las na struct ThreadPool antes de retorná-la. Mas como “armazenamos” uma thread? Vamos olhar novamente para a assinatura de thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

A função spawn retorna um JoinHandle<T>, em que T é o tipo retornado pela closure. Vamos tentar usar JoinHandle também e ver o que acontece. No nosso caso, a closure que estamos passando para o thread pool cuidará da conexão e não retornará nada, então T será o tipo unitário ().

O código da Listagem 21-14 compilará, mas ainda não criará nenhuma thread. Mudamos a definição de ThreadPool para conter um vetor de instâncias thread::JoinHandle<()>, inicializamos esse vetor com capacidade size, configuramos um loop for que executará algum código para criar as threads e, por fim, retornamos uma instância de ThreadPool contendo-as.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: Criando um vetor para ThreadPool armazenar as threads

Colocamos std::thread no escopo da biblioteca crate porque estamos usando thread::JoinHandle como o tipo dos itens no vetor em ThreadPool.

Assim que recebe um tamanho válido, nosso ThreadPool cria um novo vetor capaz de armazenar size itens. A função with_capacity realiza a mesma tarefa que Vec::new, mas com uma diferença importante: ela pré-aloca espaço no vetor. Como sabemos que precisaremos armazenar size elementos, fazer essa alocação inicial é um pouco mais eficiente do que usar Vec::new, que se redimensiona à medida que elementos são inseridos.

Quando você executar cargo check novamente, ele deverá ter sucesso.

Enviando código do ThreadPool para um thread

Deixamos um comentário no loop for da Listagem 21-14 sobre a criação das threads. Aqui veremos como realmente criá-las. A biblioteca padrão fornece thread::spawn como uma forma de criar threads, e thread::spawn espera receber algum código que a thread deve executar assim que for criada. No nosso caso, porém, queremos criar as threads e fazer com que elas esperem pelo código que enviaremos mais tarde. A implementação de threads da biblioteca padrão não inclui uma forma de fazer isso; teremos que implementá-la manualmente.

Implementaremos esse comportamento introduzindo uma nova estrutura de dados entre ThreadPool e as threads, que gerenciará esse novo comportamento. Chamaremos essa estrutura de dados de Worker, um termo comum em implementações de pools. Worker recebe o código que precisa ser executado e o executa em sua thread.

Pense nas pessoas que trabalham na cozinha de um restaurante: os trabalhadores esperam até que os pedidos cheguem dos clientes e são responsáveis por recebê-los e prepará-los.

Em vez de armazenar um vetor de instâncias JoinHandle<()> no thread pool, armazenaremos instâncias da struct Worker. Cada Worker armazenará uma única instância de JoinHandle<()>. Então, implementaremos um método em Worker que receberá uma closure de código para executar e a enviará à thread já em execução para ser executada. Também daremos a cada Worker um id, para que possamos distinguir entre as diferentes instâncias de Worker no pool ao registrar logs ou depurar.

Aqui está o novo processo que acontecerá quando criarmos um ThreadPool. Implementaremos o código que envia a closure para a thread depois de termos configurado Worker desta forma:

  1. Defina uma estrutura Worker que contenha um id e um JoinHandle<()>.
  2. Altere ThreadPool para conter um vetor de instâncias Worker.
  3. Defina uma função Worker::new que receba um número id e retorne uma instância de Worker contendo esse id e uma thread criada com uma closure vazia.
  4. Em ThreadPool::new, use o contador do loop for para gerar um id, crie um novo Worker com esse id e armazene-o no vetor.

Se você estiver pronto para um desafio, tente implementar essas mudanças sozinho antes olhando o código na Listagem 21-15.

Preparar? Aqui está a Listagem 21-15 com uma maneira de fazer as modificações anteriores.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-15: Modificando ThreadPool para armazenar instâncias de Worker em vez de armazenar threads diretamente

Alteramos o nome do campo em ThreadPool de threads para workers porque agora ele contém instâncias de Worker, e não instâncias de JoinHandle<()>. Usamos o contador do loop for como argumento para Worker::new e armazenamos cada novo Worker no vetor chamado workers.

Código externo, como nosso servidor em src/main.rs, não precisa conhecer os detalhes de implementação sobre o uso da struct Worker dentro de ThreadPool, portanto tornamos a struct Worker e sua função new privadas. A função Worker::new usa o id fornecido e armazena uma instância de JoinHandle<()> criada ao iniciar uma nova thread com uma closure vazia.

Nota: Se o sistema operacional não puder criar um thread porque não há recursos de sistema suficientes, thread::spawn será panic. Isso fará com que o nosso servidor inteiro para panic, mesmo que a criação de alguns threads possa ter sucesso. Para simplificar, esse comportamento é bom, mas em uma produção implementação de thread pool, você provavelmente desejaria usar std::thread::Builder e seus spawnMétodo que retorna Result.

Este código irá compilar e armazenar o número de instâncias Worker que especificado como um argumento para ThreadPool::new. Mas ainda não estamos processando o closure que obtemos em execute. Vejamos como fazer isso a seguir.

Enviando requisições para threads por meio de canais

O próximo problema que abordaremos é que a closure passada a thread::spawn não faz absolutamente nada. Atualmente, obtemos em execute a closure que queremos executar. Mas precisamos fornecer a thread::spawn uma closure para ser executada quando criamos cada Worker durante a construção de ThreadPool.

Queremos que as estruturas Worker que acabamos de criar busquem o código a ser executado em uma fila mantida por ThreadPool e enviem esse código para suas threads executarem.

Os canais que aprendemos no Capítulo 16, uma forma simples de comunicação entre duas threads, são perfeitos para este caso de uso. Usaremos um canal como fila de trabalhos, e execute enviará um trabalho de ThreadPool para as instâncias Worker, que repassarão o trabalho para suas threads. Eis o plano:

  1. O ThreadPool criará um canal e manterá o remetente.
  2. Cada Worker manterá o receptor.
  3. Criaremos uma nova estrutura Job que conterá as closures que queremos enviar pelo canal.
  4. O método execute enviará o trabalho que deseja executar através do remetente.
  5. Em sua thread, Worker fará um loop sobre o receptor e executará as closures de quaisquer trabalhos recebidos.

Vamos começar criando um canal em ThreadPool::new e mantendo o remetente na instância de ThreadPool, como mostrado na Listagem 21-16. A struct Job não contém nada por enquanto, mas será o tipo de item que estaremos enviando pelo canal.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-16: Modificando ThreadPool para armazenar o transmissor de um canal que transmite instâncias de Job

No ThreadPool::new, criamos nosso novo canal e fazemos com que o pool mantenha o remetente. Isso será compilado com sucesso.

Vamos tentar passar um receptor do canal em cada Worker como o thread pool cria o canal. Sabemos que queremos usar o receptor no thread que as instâncias Worker são geradas, então faremos referência ao parâmetro receiver no closure. O código na Listagem 21-17 ainda não será compilado.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-17: Passando o receptor para cada Worker

Fizemos algumas mudanças pequenas e diretas: passamos o receptor para Worker::new, e então usamos dentro do closure.

Quando tentamos verificar este código, obtemos este erro:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

O código está tentando passar receiver para várias instâncias de Worker. Isso não funcionará, como você deve se lembrar do Capítulo 16: a implementação de canais fornecida pelo Rust suporta múltiplos produtores e um único consumidor. Isso significa que não podemos simplesmente clonar a extremidade consumidora do canal para corrigir esse código. Também não queremos enviar uma mensagem várias vezes para vários consumidores; queremos uma fila de mensagens compartilhada entre várias instâncias de Worker, de modo que cada mensagem seja processada uma única vez.

Além disso, retirar um trabalho da fila do canal envolve modificar receiver, então as threads precisam de uma forma segura de compartilhar e modificar esse receiver; caso contrário, poderíamos ter condições de corrida, como vimos no Capítulo 16.

Lembre-se dos smart pointers thread-safe discutidos no Capítulo 16: para compartilhar ownership entre várias threads e permitir que essas threads alterem o valor, precisamos usar Arc<Mutex<T>>. O tipo Arc permitirá que múltiplas instâncias de Worker possuam o receptor, e Mutex garantirá que apenas um Worker por vez obtenha um trabalho do receptor. A Listagem 21-18 mostra as mudanças que precisamos fazer.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-18: Compartilhando o receptor entre as instâncias de Worker usando Arc e Mutex

Em ThreadPool::new, colocamos o receptor dentro de um Arc e de um Mutex. Para cada novo Worker, clonamos o Arc para aumentar a contagem de referências, de modo que as instâncias de Worker possam compartilhar a ownership do receptor.

Com essas mudanças, o código compila! Estamos chegando lá!

Implementando o Método execute

Vamos finalmente implementar o método execute em ThreadPool. Também vamos transformar Job de uma struct em um alias de tipo para um objeto trait que contém o tipo de closure recebido por execute. Como discutimos na seção “Sinônimos de tipo e aliases” no Capítulo 20, aliases de tipo nos permitem encurtar tipos longos para facilitar o uso. Veja a Listagem 21-19.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-19: Criando um alias de tipo Job para um Box que guarda cada closure e enviando então o job pelo canal

Depois de criar uma nova instância de Job usando a closure que obtemos em execute, enviamos esse trabalho pela extremidade de envio do canal. Chamamos unwrap em send para o caso de falha no envio. Isso pode acontecer se, por exemplo, todas as nossas threads tiverem parado de executar, o que significaria que a extremidade receptora deixou de receber novas mensagens. No momento, porém, não temos como interromper a execução das threads: elas continuam executando enquanto o pool existir. A razão pela qual usamos unwrap é que sabemos que esse caso de falha não acontecerá, mas o compilador não sabe disso.

Mas ainda não terminamos! Em Worker, a closure passada para thread::spawn ainda apenas faz referência à extremidade receptora do canal. Em vez disso, precisamos que essa closure faça um loop infinito, pedindo um trabalho à extremidade receptora do canal e executando-o quando conseguir um. Vamos fazer a mudança mostrada na Listagem 21-20 para Worker::new.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-20: Recebendo e executando os jobs na thread da instância Worker

Aqui, primeiro chamamos lock em receiver para adquirir o mutex e depois chamamos unwrap para entrar em panic em caso de erro. A aquisição de um bloqueio pode falhar se o mutex estiver em um estado envenenado, o que pode acontecer se alguma outra thread entrar em pânico enquanto mantém o bloqueio em vez de liberá-lo. Nessa situação, chamar unwrap para fazer essa thread entrar em panic é a ação correta. Se quiser, você pode trocar esse unwrap por um expect com uma mensagem de erro que faça sentido para você.

Se conseguirmos adquirir o bloqueio do mutex, chamamos recv para receber um Job do canal. Um unwrap final também lida com quaisquer erros aqui, que podem ocorrer se a thread que contém o remetente tiver sido desligada, de modo semelhante ao fato de o método send retornar Err se o receptor for desligado.

A chamada a recv é bloqueante; portanto, se ainda não houver trabalho, a thread atual esperará até que um job esteja disponível. O Mutex<T> garante que apenas a thread de um Worker por vez esteja tentando solicitar um trabalho.

Nosso thread pool agora está funcionando! Execute cargo run e faça algumas requisições:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

Sucesso! Agora temos um thread pool que executa conexões de forma assíncrona. Nunca são criadas mais de quatro threads, então nosso sistema não será sobrecarregado se o servidor receber muitas requisições. Se fizermos um pedido para /sleep, o servidor poderá atender outras requisições fazendo com que outra thread as execute.

Nota: Se você abrir /sleep em várias janelas do navegador simultaneamente, elas pode carregar um de cada vez em intervalos de cinco segundos. Alguns navegadores da web executam múltiplas instâncias da mesma solicitação sequencialmente por motivos de armazenamento em cache. Isto a limitação não é causada pelo nosso web server.

Este é um bom momento para fazer uma pausa e considerar como o código nas Listagens 21-18, 21-19, e 21-20 seriam diferentes se estivéssemos usando futures em vez de closure para o trabalho a ser feito. Que tipos mudariam? Como seriam as assinaturas dos métodos diferente, se é que existe? Quais partes do código permaneceriam iguais?

Depois de aprender sobre o loop while let no Capítulo 17 e Capítulo 19, você pode estar se perguntando por que não escrevemos o código Worker thread conforme mostrado em Listagem 21-21.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-21: Uma implementação alternativa de Worker::new usando while let

Este código é compilado e executado, mas não resulta no threading desejado comportamento: uma solicitação lenta ainda fará com que outras solicitações esperem para serem processado. A razão é um tanto sutil: a estrutura Mutex não tem público Método unlock porque o ownership do bloqueio é baseado no lifetime do o MutexGuard<T> dentro do LockResult<MutexGuard<T>> que o lock método retorna. Em tempo de compilação, o borrow checker pode então impor a regra que um recurso protegido por um Mutexnão pode ser acessado a menos que tenhamos o bloqueio. No entanto, esta implementação também pode resultar na retenção do bloqueio mais do que o pretendido se não estivermos atentos ao lifetime do MutexGuard<T>.

O código na Listagem 21-20 que usa let job = receiver.lock().unwrap().recv().unwrap(); funciona porque com let, qualquer valores temporários usados na expressão do lado direito da igualdade O sinal é eliminado imediatamente quando a instrução lettermina. No entanto, while let(e if lete match) não descarta valores temporários até o final do o bloco associado. Na Listagem 21-21, o bloqueio permanece mantido durante da chamada para job(), o que significa que outras instâncias Workernão podem receber trabalhos.