De um servidor single-thread para um servidor multithread
Neste momento, o servidor processará cada solicitação por vez, o que significa que não processar uma segunda conexão até que o processamento da primeira conexão seja concluído. Se o servidor recebesse cada vez mais solicitações, esta execução serial seria cada vez menos ideal. Se o servidor receber uma solicitação que demora muito para processar, as solicitações subsequentes terão que esperar até que a solicitação longa seja concluído, mesmo que as novas solicitações possam ser processadas rapidamente. Precisaremos consertar isso, mas primeiro veremos o problema em ação.
Simulando uma requisição lenta
Veremos como uma solicitação de processamento lento pode afetar outras solicitações feitas para nossa implementação de servidor atual. A Listagem 21-10 implementa o tratamento de uma solicitação para /sleep com uma resposta lenta simulada que fará o servidor dormir por cinco segundos antes de responder.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --snip--
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// --snip--
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --snip--
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Mudamos de if para match agora que temos três casos. Precisamos
fazer match explicitamente em um slice de request_line para comparar padrões com
valores literais de string; match não faz referência automática e
desreferenciação, como faz o método da igualdade.
O primeiro braço é igual ao bloco if da Listagem 21-9. O segundo braço
corresponde a uma solicitação para /sleep. Quando essa solicitação for recebida, o servidor irá
durma por cinco segundos antes de renderizar a página HTML bem-sucedida. O terceiro braço
é igual ao bloco else da Listagem 21-9.
Você pode ver o quão primitivo é o nosso servidor: bibliotecas reais lidariam com o reconhecimento de múltiplas solicitações de uma forma muito menos detalhada!
Inicie o servidor usando cargo run. Em seguida, abra duas janelas do navegador: uma para
http://127.0.0.1:7878 e outro para http://127.0.0.1:7878/sleep. Se você
insira o URI / algumas vezes, como antes, você verá que ele responde rapidamente. Mas se
você digita /sleep e então carrega /, você verá que / espera até sleep
dormiu cinco segundos inteiros antes de carregar.
Existem várias técnicas que podemos usar para evitar que uma requisição lenta faça outras requisições se acumularem, incluindo o uso de async, como fizemos no Capítulo 17; a que implementaremos aqui é um thread pool.
Melhorando o rendimento com um pool de threads
Um thread pool é um grupo de threads geradas que estão prontas e aguardando para lidar com uma tarefa. Quando o programa recebe uma nova tarefa, ele atribui uma das threads no pool para a tarefa e que thread processará a tarefa. O threads restantes no pool estão disponíveis para lidar com quaisquer outras tarefas que venham enquanto o primeiro thread está sendo processado. Quando o primeiro thread estiver pronto processando sua tarefa, ele retorna ao pool de threads ocioso, pronto para lidar uma nova tarefa. Um thread pool permite processar conexões simultaneamente, aumentando o rendimento do seu servidor.
Limitaremos o número de threads no pool a um pequeno número para nos proteger de ataques DoS; se nosso programa criasse um novo thread para cada solicitação como quando chegou, alguém fazendo 10 milhões de solicitações ao nosso servidor poderia causar estragos usando todos os recursos do nosso servidor e processando solicitações parar.
Em vez de gerar threads ilimitadas, teremos um número fixo de
threads esperando no pool. As solicitações recebidas são enviadas ao pool para
processamento. O pool manterá uma fila de solicitações recebidas. Cada um dos
threads no pool irá gerar uma solicitação desta fila, tratar a solicitação,
e então solicite outra solicitação à fila. Com este design, podemos processar
para solicitações _ N _ simultaneamente, onde _ N _ é o número de threads. Se cada
thread está respondendo a uma solicitação de longa duração, as solicitações subsequentes ainda podem
fazer backup na fila, mas aumentamos o número de solicitações de longa duração
podemos lidar antes de chegar a esse ponto.
Esta técnica é apenas uma das muitas maneiras de melhorar o rendimento de uma web servidor. Outras opções que você pode explorar são o modelo fork/join, o modelo de E/S async de thread único e o modelo de E/S async multithread. Se você está interessado neste tópico, você pode ler mais sobre outras soluções e tente implementá-los; com uma linguagem de baixo nível como Rust, todos esses opções são possíveis.
Antes de começarmos a implementar um thread pool, vamos falar sobre como seu uso deve se parecer. Quando você está tentando projetar código, escrever primeiro a interface do cliente pode ajudar a orientar o design. Escreva a API do código de modo que ela fique estruturada da maneira como você deseja chamá-la; então, implemente a funcionalidade dentro dessa estrutura, em vez de implementar a funcionalidade e só depois projetar a API pública.
Semelhante à forma como usamos o desenvolvimento orientado a testes no projeto no Capítulo 12, usaremos desenvolvimento orientado a compilador aqui. Escreveremos o código que chama o funções que queremos e, em seguida, examinaremos os erros do compilador para determinar o que devemos mudar a seguir para fazer o código funcionar. Antes de fazermos isso, no entanto, exploraremos a técnica que não usaremos como ponto de partida.
Gerando uma thread para cada solicitação
Primeiro, vamos explorar como nosso código ficaria se ele criasse um novo thread para cada conexão. Como mencionado anteriormente, este não é o nosso plano final devido ao problemas com a geração potencial de um número ilimitado de threads, mas é um ponto de partida para obter primeiro um servidor multithread funcional. Então, adicionaremos o thread pool como uma melhoria, e contrastar as duas soluções será mais fácil.
A Listagem 21-11 mostra as alterações a serem feitas em main para gerar um novo thread para
lidar com cada stream dentro do loop for.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Como você aprendeu no Capítulo 16, thread::spawn criará um novo thread e então
execute o código no closure no novo thread. Se você executar este código e carregar
/sleep no seu navegador, então / em mais duas guias do navegador, você realmente verá
que as solicitações para / não precisam esperar que /sleep termine. No entanto, como
mencionamos, isso acabará sobrecarregando o sistema porque você estaria fazendo
novo threads sem qualquer limite.
Você também deve se lembrar do Capítulo 17 que este é exatamente o tipo de situação onde async e await realmente brilham! Tenha isso em mente enquanto construímos o thread pool e pense em como as coisas seriam diferentes ou iguais com async.
Criando um número finito de threads
Queremos que nosso thread pool funcione de maneira semelhante e familiar, para
que trocar threads por um thread pool não exija grandes alterações no código
que usa nossa API. A Listagem 21-12 mostra a interface hipotética da struct
ThreadPool que queremos usar no lugar de thread::spawn.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ThreadPoolUsamos ThreadPool::new para criar um novo thread pool com um número configurável
de threads, neste caso quatro. Então, no loop for, pool.executetem um
interface semelhante ao thread::spawn, pois é necessário um closure que o pool
deve ser executado para cada stream. Precisamos implementar pool.executepara que
pega o closure e o entrega a um thread no pool para execução. Este código não
ainda compilar, mas tentaremos para que o compilador possa nos orientar sobre como corrigi-lo.
Construindo ThreadPool usando desenvolvimento orientado a compilador
Faça as alterações na Listagem 21-12 em src/main.rs e então vamos usar o
erros do compilador do cargo check para impulsionar nosso desenvolvimento. Aqui está o primeiro
erro que obtemos:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Ótimo! Esse erro nos diz que precisamos de um tipo ou módulo ThreadPool,
então vamos construí-lo agora. Nossa implementação de ThreadPool será
independente do tipo de trabalho que nosso web server está realizando. Então,
vamos transformar o crate hello de um crate binário em um crate de
biblioteca para armazenar nossa implementação de ThreadPool. Depois de
mudarmos para um crate de biblioteca, também poderíamos usar essa biblioteca de
thread pool separadamente para qualquer trabalho que quiséssemos fazer, não
apenas para atender requisições web.
Crie um arquivo src/lib.rs que contenha o seguinte, que é o mais simples
definição de uma estrutura ThreadPool que podemos ter por enquanto:
pub struct ThreadPool;
Em seguida, edite o arquivo main.rs para trazer ThreadPool para o escopo da biblioteca
crate adicionando o seguinte código ao topo de src/main.rs:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Este código ainda não funcionará, mas vamos verificá-lo novamente para obter o próximo erro que precisamos abordar:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Este erro indica que a seguir precisamos criar uma função associada chamada
new paraThreadPool . Também sabemos quenewprecisa ter um parâmetro
que pode aceitar4como argumento e deve retornar uma instânciaThreadPool.
Vamos implementar a função newmais simples que terá esses
características:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
Escolhemos usize como o tipo do parâmetro size porque sabemos que um
número negativo de threads não faz sentido. Também sabemos que usaremos isso
4 como o número de elementos em uma coleção de threads, que é o que o
O tipousize é para, conforme discutido na seção “Tipos inteiros” no Capítulo 3.
Vamos verificar o código novamente:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Agora o erro ocorre porque não temos um método execute em ThreadPool.
Lembre-se do artigo “Criando um Número Finito de
Threads” seção que
decidimos que nosso thread pool deveria ter uma interface semelhante à de
thread::spawn. Além disso, implementaremos a função execute para que ela
receba a closure fornecida e a entregue a uma thread ociosa no pool para ser
executada.
Definiremos o método execute em ThreadPool para tomar um closure como
parâmetro. Lembre-se do artigo “Movendo valores capturados para fora
Fechamentos” no Capítulo 13 que podemos
tome closures como parâmetros com três traits diferentes: Fn, FnMute
FnOnce. Precisamos decidir que tipo de closure usar aqui. Nós sabemos que vamos
acabar fazendo algo semelhante à biblioteca padrãothread::spawn
implementação, para que possamos ver o que limita a assinatura de thread::spawn
tem em seu parâmetro. A documentação nos mostra o seguinte:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
O parâmetro de tipo F é o que nos preocupa aqui; o tipo T
parâmetro está relacionado ao valor de retorno e não estamos preocupados com isso. Nós
podemos ver que spawnusa FnOncecomo trait vinculado a F. Isto é provavelmente
o que queremos também, porque eventualmente passaremos no argumento que entramos
executeparaspawn. Podemos ter ainda mais certeza de queFnOnceé o trait que
deseja usar porque o thread para executar uma solicitação executará apenas aquela
closure da solicitação uma vez, que corresponde aoOnceem FnOnce.
O parâmetro de tipo F também possui o trait vinculado ao Send e o lifetime vinculado
'static , que são úteis em nossa situação: Precisamos deSendpara transferir o
closure de um thread para outro e'staticporque não sabemos quanto tempo
o thread levará para ser executado. Vamos criar um métodoexecuteem
ThreadPoolque receberá um parâmetro genérico do tipo Fcom estes limites:
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Ainda usamos () depois de FnOnce porque este FnOnce representa um closure
que não aceita parâmetros e retorna o tipo de unidade (). Assim como a função
definições, o tipo de retorno pode ser omitido da assinatura, mas mesmo se
não temos parâmetros, ainda precisamos dos parênteses.
Novamente, esta é a implementação mais simples do método execute: ela faz
nada, mas estamos apenas tentando compilar nosso código. Vamos verificar novamente:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Ele compila! Mas observe que se você tentar cargo run e fizer uma solicitação no
navegador, você verá os erros no navegador que vimos no início de
o capítulo. Nossa biblioteca não está realmente chamando o closure passado para execute
ainda!
Nota: Um ditado que você pode ouvir sobre linguagens com compiladores estritos, como Haskell e Rust, é “Se o código compilar, ele funciona”. Mas este ditado não é universalmente verdadeiro. Nosso projeto compila, mas não faz absolutamente nada! Se nós estamos construindo um projeto real e completo, este seria um bom momento para começar escrever testes unitários para verificar se o código compila e tem o comportamento que quero.
Pense nisto: o que seria diferente aqui se fôssemos executar um future em vez de uma closure?
Validando o número de threads em new
Não estamos fazendo nada com os parâmetros new e execute. Vamos
implementar o corpo dessas funções com o comportamento que desejamos. Para
começar, pensemos em new. Antes escolhemos um tipo não assinado para o
parâmetro size porque um pool com um número negativo de threads não faz
sentido. No entanto, um pool com zero threads também não faz sentido, e zero é
um valor perfeitamente válido para usize. Vamos adicionar código para
verificar se size é maior que zero antes de retornar uma instância de
ThreadPool, e faremos o programa entrar em panic se receber zero, usando a
macro assert!, como mostra a Listagem 21-13.
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool::new para gerar panic se size for zeroTambém adicionamos alguma documentação para nosso ThreadPool com comentários de documentos.
Observe que seguimos boas práticas de documentação adicionando uma seção que
chama as situações em que nossa função pode panic, conforme discutido em
Capítulo 14. Tente executar cargo doc --open e clicar na estrutura ThreadPool
para ver como são os documentos gerados para new!
Em vez de adicionar a macro assert! como fizemos aqui, poderíamos transformar
new em build e retornar um Result, como fizemos com Config::build no
projeto de I/O da Listagem 12-9. Mas decidimos que, neste caso, tentar criar um
thread pool sem nenhuma thread deve ser um erro irrecuperável. Se você estiver
se sentindo ambicioso, tente escrever uma função chamada build com a
seguinte assinatura para compará-la com a função new:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
Criando espaço para armazenar os threads
Agora que temos uma forma de garantir que há um número válido de threads para
armazenar no pool, podemos criá-las e guardá-las na struct ThreadPool antes
de retorná-la. Mas como “armazenamos” uma thread? Vamos olhar novamente para a
assinatura de thread::spawn:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
A função spawn retorna um JoinHandle<T>, em que T é o tipo retornado pela
closure. Vamos tentar usar JoinHandle também e ver o que acontece. No nosso
caso, a closure que estamos passando para o thread pool cuidará da conexão e
não retornará nada, então T será o tipo unitário ().
O código da Listagem 21-14 compilará, mas ainda não criará nenhuma thread.
Mudamos a definição de ThreadPool para conter um vetor de instâncias
thread::JoinHandle<()>, inicializamos esse vetor com capacidade size,
configuramos um loop for que executará algum código para criar as threads e,
por fim, retornamos uma instância de ThreadPool contendo-as.
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool armazenar as threadsColocamos std::thread no escopo da biblioteca crate porque estamos
usando thread::JoinHandle como o tipo dos itens no vetor em
ThreadPool.
Assim que recebe um tamanho válido, nosso ThreadPool cria um novo vetor capaz
de armazenar size itens. A função with_capacity realiza a mesma tarefa que
Vec::new, mas com uma diferença importante: ela pré-aloca espaço no vetor.
Como sabemos que precisaremos armazenar size elementos, fazer essa alocação
inicial é um pouco mais eficiente do que usar Vec::new, que se redimensiona à
medida que elementos são inseridos.
Quando você executar cargo check novamente, ele deverá ter sucesso.
Enviando código do ThreadPool para um thread
Deixamos um comentário no loop for da Listagem 21-14 sobre a criação das
threads. Aqui veremos como realmente criá-las. A biblioteca padrão fornece
thread::spawn como uma forma de criar threads, e thread::spawn espera
receber algum código que a thread deve executar assim que for criada. No nosso
caso, porém, queremos criar as threads e fazer com que elas esperem pelo
código que enviaremos mais tarde. A implementação de threads da biblioteca
padrão não inclui uma forma de fazer isso; teremos que implementá-la
manualmente.
Implementaremos esse comportamento introduzindo uma nova estrutura de dados entre
ThreadPool e as threads, que gerenciará esse novo comportamento. Chamaremos
essa estrutura de dados de Worker, um termo comum em implementações de pools.
Worker recebe o código que precisa ser executado e o executa em sua thread.
Pense nas pessoas que trabalham na cozinha de um restaurante: os trabalhadores esperam até que os pedidos cheguem dos clientes e são responsáveis por recebê-los e prepará-los.
Em vez de armazenar um vetor de instâncias JoinHandle<()> no thread pool,
armazenaremos instâncias da struct Worker. Cada Worker armazenará uma única
instância de JoinHandle<()>. Então, implementaremos um método em Worker que
receberá uma closure de código para executar e a enviará à thread já em
execução para ser executada. Também daremos a cada Worker um id, para que
possamos distinguir entre as diferentes instâncias de Worker no pool ao
registrar logs ou depurar.
Aqui está o novo processo que acontecerá quando criarmos um ThreadPool.
Implementaremos o código que envia a closure para a thread depois de termos
configurado Worker desta forma:
- Defina uma estrutura
Workerque contenha umide umJoinHandle<()>. - Altere
ThreadPoolpara conter um vetor de instânciasWorker. - Defina uma função
Worker::newque receba um númeroide retorne uma instância deWorkercontendo esseide uma thread criada com uma closure vazia. - Em
ThreadPool::new, use o contador do loopforpara gerar umid, crie um novoWorkercom esseide armazene-o no vetor.
Se você estiver pronto para um desafio, tente implementar essas mudanças sozinho antes olhando o código na Listagem 21-15.
Preparar? Aqui está a Listagem 21-15 com uma maneira de fazer as modificações anteriores.
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool para armazenar instâncias de Worker em vez de armazenar threads diretamenteAlteramos o nome do campo em ThreadPool de threads para workers porque
agora ele contém instâncias de Worker, e não instâncias de JoinHandle<()>.
Usamos o contador do loop for como argumento para Worker::new e armazenamos
cada novo Worker no vetor chamado workers.
Código externo, como nosso servidor em src/main.rs, não precisa conhecer os
detalhes de implementação sobre o uso da struct Worker dentro de
ThreadPool, portanto tornamos a struct Worker e sua função new
privadas. A função Worker::new usa o id fornecido e armazena uma instância
de JoinHandle<()> criada ao iniciar uma nova thread com uma closure vazia.
Nota: Se o sistema operacional não puder criar um thread porque não há recursos de sistema suficientes,
thread::spawnserá panic. Isso fará com que o nosso servidor inteiro para panic, mesmo que a criação de alguns threads possa ter sucesso. Para simplificar, esse comportamento é bom, mas em uma produção implementação de thread pool, você provavelmente desejaria usarstd::thread::Buildere seusspawnMétodo que retornaResult.
Este código irá compilar e armazenar o número de instâncias Worker que
especificado como um argumento para ThreadPool::new. Mas ainda não estamos processando
o closure que obtemos em execute. Vejamos como fazer isso a seguir.
Enviando requisições para threads por meio de canais
O próximo problema que abordaremos é que a closure passada a thread::spawn
não faz absolutamente nada. Atualmente, obtemos em execute a closure que
queremos executar. Mas precisamos fornecer a thread::spawn uma closure para
ser executada quando criamos cada Worker durante a construção de
ThreadPool.
Queremos que as estruturas Worker que acabamos de criar busquem o código a
ser executado em uma fila mantida por ThreadPool e enviem esse código para
suas threads executarem.
Os canais que aprendemos no Capítulo 16, uma forma simples de comunicação entre
duas threads, são perfeitos para este caso de uso. Usaremos um canal como fila
de trabalhos, e execute enviará um trabalho de ThreadPool para as
instâncias Worker, que repassarão o trabalho para suas threads. Eis o plano:
- O
ThreadPoolcriará um canal e manterá o remetente. - Cada
Workermanterá o receptor. - Criaremos uma nova estrutura
Jobque conterá as closures que queremos enviar pelo canal. - O método
executeenviará o trabalho que deseja executar através do remetente. - Em sua thread,
Workerfará um loop sobre o receptor e executará as closures de quaisquer trabalhos recebidos.
Vamos começar criando um canal em ThreadPool::new e mantendo o remetente na
instância de ThreadPool, como mostrado na Listagem 21-16. A struct Job não
contém nada por enquanto, mas será o tipo de item que estaremos enviando pelo
canal.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool para armazenar o transmissor de um canal que transmite instâncias de JobNo ThreadPool::new, criamos nosso novo canal e fazemos com que o pool mantenha o
remetente. Isso será compilado com sucesso.
Vamos tentar passar um receptor do canal em cada Worker como o thread
pool cria o canal. Sabemos que queremos usar o receptor no thread que
as instâncias Worker são geradas, então faremos referência ao parâmetro receiver no
closure. O código na Listagem 21-17 ainda não será compilado.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
WorkerFizemos algumas mudanças pequenas e diretas: passamos o receptor para
Worker::new, e então usamos dentro do closure.
Quando tentamos verificar este código, obtemos este erro:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
O código está tentando passar receiver para várias instâncias de Worker.
Isso não funcionará, como você deve se lembrar do Capítulo 16: a implementação
de canais fornecida pelo Rust suporta múltiplos produtores e um único
consumidor. Isso significa que não podemos simplesmente clonar a extremidade
consumidora do canal para corrigir esse código. Também não queremos enviar uma
mensagem várias vezes para vários consumidores; queremos uma fila de mensagens
compartilhada entre várias instâncias de Worker, de modo que cada mensagem
seja processada uma única vez.
Além disso, retirar um trabalho da fila do canal envolve modificar receiver,
então as threads precisam de uma forma segura de compartilhar e modificar esse
receiver; caso contrário, poderíamos ter condições de corrida, como vimos no
Capítulo 16.
Lembre-se dos smart pointers thread-safe discutidos no Capítulo 16: para
compartilhar ownership entre várias threads e permitir que essas threads
alterem o valor, precisamos usar Arc<Mutex<T>>. O tipo Arc permitirá que
múltiplas instâncias de Worker possuam o receptor, e Mutex garantirá que
apenas um Worker por vez obtenha um trabalho do receptor. A Listagem 21-18
mostra as mudanças que precisamos fazer.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Worker usando Arc e MutexEm ThreadPool::new, colocamos o receptor dentro de um Arc e de um Mutex.
Para cada novo Worker, clonamos o Arc para aumentar a contagem de
referências, de modo que as instâncias de Worker possam compartilhar a
ownership do receptor.
Com essas mudanças, o código compila! Estamos chegando lá!
Implementando o Método execute
Vamos finalmente implementar o método execute em ThreadPool. Também vamos
transformar Job de uma struct em um alias de tipo para um objeto trait que
contém o tipo de closure recebido por execute. Como discutimos na seção
“Sinônimos de tipo e aliases” no Capítulo 20,
aliases de tipo nos permitem encurtar tipos longos para facilitar o uso. Veja
a Listagem 21-19.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Job para um Box que guarda cada closure e enviando então o job pelo canalDepois de criar uma nova instância de Job usando a closure que obtemos em
execute, enviamos esse trabalho pela extremidade de envio do canal. Chamamos
unwrap em send para o caso de falha no envio. Isso pode acontecer se, por
exemplo, todas as nossas threads tiverem parado de executar, o que significaria
que a extremidade receptora deixou de receber novas mensagens. No momento,
porém, não temos como interromper a execução das threads: elas continuam
executando enquanto o pool existir. A razão pela qual usamos unwrap é que
sabemos que esse caso de falha não acontecerá, mas o compilador não sabe disso.
Mas ainda não terminamos! Em Worker, a closure passada para thread::spawn
ainda apenas faz referência à extremidade receptora do canal. Em vez disso,
precisamos que essa closure faça um loop infinito, pedindo um trabalho à
extremidade receptora do canal e executando-o quando conseguir um. Vamos fazer
a mudança mostrada na Listagem 21-20 para Worker::new.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
WorkerAqui, primeiro chamamos lock em receiver para adquirir o mutex e depois
chamamos unwrap para entrar em panic em caso de erro. A aquisição de um
bloqueio pode falhar se o mutex estiver em um estado envenenado, o que pode
acontecer se alguma outra thread entrar em pânico enquanto mantém o bloqueio em
vez de liberá-lo. Nessa situação, chamar unwrap para fazer essa thread entrar
em panic é a ação correta. Se quiser, você pode trocar esse unwrap por um
expect com uma mensagem de erro que faça sentido para você.
Se conseguirmos adquirir o bloqueio do mutex, chamamos recv para receber um
Job do canal. Um unwrap final também lida com quaisquer erros aqui, que
podem ocorrer se a thread que contém o remetente tiver sido desligada, de modo
semelhante ao fato de o método send retornar Err se o receptor for
desligado.
A chamada a recv é bloqueante; portanto, se ainda não houver trabalho, a
thread atual esperará até que um job esteja disponível. O Mutex<T> garante
que apenas a thread de um Worker por vez esteja tentando solicitar um
trabalho.
Nosso thread pool agora está funcionando! Execute cargo run e faça algumas
requisições:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Sucesso! Agora temos um thread pool que executa conexões de forma assíncrona. Nunca são criadas mais de quatro threads, então nosso sistema não será sobrecarregado se o servidor receber muitas requisições. Se fizermos um pedido para /sleep, o servidor poderá atender outras requisições fazendo com que outra thread as execute.
Nota: Se você abrir /sleep em várias janelas do navegador simultaneamente, elas pode carregar um de cada vez em intervalos de cinco segundos. Alguns navegadores da web executam múltiplas instâncias da mesma solicitação sequencialmente por motivos de armazenamento em cache. Isto a limitação não é causada pelo nosso web server.
Este é um bom momento para fazer uma pausa e considerar como o código nas Listagens 21-18, 21-19, e 21-20 seriam diferentes se estivéssemos usando futures em vez de closure para o trabalho a ser feito. Que tipos mudariam? Como seriam as assinaturas dos métodos diferente, se é que existe? Quais partes do código permaneceriam iguais?
Depois de aprender sobre o loop while let no Capítulo 17 e Capítulo 19, você
pode estar se perguntando por que não escrevemos o código Worker thread conforme mostrado em
Listagem 21-21.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker::new usando while letEste código é compilado e executado, mas não resulta no threading desejado
comportamento: uma solicitação lenta ainda fará com que outras solicitações esperem para serem
processado. A razão é um tanto sutil: a estrutura Mutex não tem público
Método unlock porque o ownership do bloqueio é baseado no lifetime do
o MutexGuard<T> dentro do LockResult<MutexGuard<T>> que o lock
método retorna. Em tempo de compilação, o borrow checker pode então impor a regra
que um recurso protegido por um Mutexnão pode ser acessado a menos que tenhamos o
bloqueio. No entanto, esta implementação também pode resultar na retenção do bloqueio
mais do que o pretendido se não estivermos atentos ao lifetime do
MutexGuard<T>.
O código na Listagem 21-20 que usa let job = receiver.lock().unwrap().recv().unwrap(); funciona porque com let, qualquer
valores temporários usados na expressão do lado direito da igualdade
O sinal é eliminado imediatamente quando a instrução lettermina. No entanto, while let(e if lete match) não descarta valores temporários até o final do
o bloco associado. Na Listagem 21-21, o bloqueio permanece mantido durante
da chamada para job(), o que significa que outras instâncias Workernão podem receber trabalhos.