Desligamento e limpeza elegantes
O código na Listagem 21-20 está respondendo a requisições de forma assíncrona por meio do
uso de um thread pool, como pretendíamos. Recebemos alguns avisos sobre os campos
workers, id e thread, que não estamos usando de forma direta, lembrando-nos de que
não estamos limpando nada. Quando usamos o método menos elegante
ctrl-C para interromper a thread principal, todas as outras threads
também são interrompidas imediatamente, mesmo que estejam no meio de atender uma
requisição.
A seguir, implementaremos a trait Drop para chamar join em cada uma das
threads do pool, para que elas possam finalizar as requisições em que estão
trabalhando antes de encerrar. Em seguida, implementaremos uma maneira de
informar às threads que elas devem parar de aceitar novas requisições e se
desligar. Para ver esse código em ação, vamos modificar nosso servidor para
aceitar apenas duas requisições antes de desligar normalmente seu thread pool.
Uma coisa a observar à medida que avançamos: nada disso afeta as partes do código que lidam com a execução das closures, então tudo aqui seria o mesmo se estivéssemos usando um thread pool para um runtime async.
Implementando a trait Drop em ThreadPool
Vamos começar implementando Drop em nosso thread pool. Quando ele for
descartado, todas as nossas threads deverão ser reunidas com join para
garantir que concluam seu trabalho. A Listagem 21-22 mostra uma primeira
tentativa de implementar Drop; esse código ainda não funcionará muito bem.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
join quando o thread pool sai de escopoPrimeiro, percorremos cada um dos workers do thread pool. Usamos &mut para
isso porque self é uma referência mutável e também precisamos poder alterar
cada worker. Para cada worker, imprimimos uma mensagem dizendo que essa
instância de Worker está sendo encerrada e então chamamos join na thread
dessa instância. Se a chamada a join falhar, usamos unwrap para fazer o
Rust entrar em panic e encerrar de forma abrupta.
Aqui está o erro que obtemos quando compilamos este código:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
O erro nos diz que não podemos chamar join porque só temos um empréstimo
mutável de cada worker, e join toma ownership de seu argumento. Para
resolver esse problema, precisamos mover a thread para fora da instância
Worker que possui o campo thread, para que join possa consumi-la. Uma
maneira de fazer isso seria adotar a mesma abordagem que usamos na
Listagem 18-15. Se Worker armazenasse um Option<thread::JoinHandle<()>>,
poderíamos chamar o método take em Option para mover o valor para fora da
variante Some e deixar uma variante None em seu lugar. Em outras palavras,
um Worker em execução teria Some em thread, e, quando quiséssemos limpar
um Worker, substituiríamos Some por None, para que o Worker deixasse de
ter uma thread para executar.
No entanto, o único momento em que isso aconteceria seria ao descartar o
Worker. Em troca, teríamos que lidar com um Option<thread::JoinHandle<()>>
em todo lugar em que acessássemos worker.thread. Rust idiomático usa bastante
Option, mas, quando você se vê embrulhando algo que sabe que sempre estará
presente em um Option apenas como solução de contorno, é uma boa ideia
procurar abordagens alternativas para manter o código mais limpo e menos sujeito
a erros.
Neste caso, existe uma alternativa melhor: o método Vec::drain. Ele aceita um
parâmetro de intervalo para especificar quais itens remover do vetor e retorna
um iterator desses itens. Passar a sintaxe de intervalo .. removerá todos
os valores do vetor.
Portanto, precisamos atualizar a implementação de Drop para ThreadPool
assim:
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
}
Isso resolve o erro do compilador e não requer nenhuma outra alteração em nosso código. Observe que, como drop pode ser chamado em caso de pânico, o unwrap também poderia panic e causar um panic duplo, que trava imediatamente o programa e encerra qualquer limpeza em andamento. Isso é bom para um programa de exemplo, mas não é recomendado para código de produção.
Sinalização para os threads pararem de escutar trabalhos
Com todas as alterações que fizemos, nosso código é compilado sem nenhum aviso.
No entanto, a má notícia é que esse código ainda não funciona da maneira que
desejamos. A chave está na lógica da closure executada pelas threads das
instâncias Worker: no momento, chamamos join, mas isso não desligará essas
threads, porque elas estão em loop, sempre procurando trabalho. Se tentarmos
descartar nosso ThreadPool com a implementação atual de drop, a thread
principal ficará bloqueada para sempre, esperando a primeira thread terminar.
Para corrigir esse problema, precisaremos de uma alteração na implementação de
drop para ThreadPool e, em seguida, de uma mudança no loop de Worker.
Primeiro, mudaremos a implementação de drop para ThreadPool para descartar
explicitamente o sender antes de esperar a conclusão das threads. A
Listagem 21-23 mostra as alterações em ThreadPool para descartar
explicitamente sender. Ao contrário do caso da thread, aqui precisamos usar
um Option para poder mover sender para fora de ThreadPool com
Option::take.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
drop explícito de sender antes de esperar as threads Worker com joinDescartar sender fecha o canal, o que indica que nenhuma outra mensagem será
enviada. Quando isso acontece, todas as chamadas a recv feitas pelas
instâncias Worker em seu loop infinito retornam erro. Na Listagem 21-24,
alteramos o loop de Worker para sair normalmente nesse caso, o que significa
que as threads terminarão quando a implementação de drop para ThreadPool
chamar join nelas.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv retorna um erroPara ver este código em ação, vamos modificar main para aceitar apenas duas solicitações
antes de desligar o servidor normalmente, conforme mostrado na Listagem 21-25.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Você não gostaria que um web server do mundo real desligasse depois de atender apenas duas requisições. Este código apenas demonstra que o desligamento e a limpeza normais estão funcionando.
O método take é definido na trait Iterator e limita a iteração, no máximo,
aos dois primeiros itens. O ThreadPool sairá de escopo ao final de main, e
a implementação de drop será executada.
Inicie o servidor com cargo run e faça três solicitações. O terceiro pedido
deve ocorrer um erro e, em seu terminal, você deverá ver uma saída semelhante a esta:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Você pode ver uma ordem diferente de IDs e mensagens Worker impressas. Pelas
mensagens, conseguimos perceber como esse código funciona: as instâncias
Worker 0 e 3 receberam as duas primeiras requisições. O servidor parou de
aceitar conexões após a segunda, e a implementação de Drop para ThreadPool
começa a ser executada antes mesmo de Worker 3 iniciar seu trabalho. Soltar o
sender desconecta todas as instâncias Worker e solicita que elas sejam
encerradas. Cada instância Worker imprime uma mensagem quando se desconecta,
e então o thread pool chama join para esperar que a thread de cada Worker
termine.
Observe um aspecto interessante desta execução específica: o ThreadPool
descartou o sender, e, antes que qualquer Worker recebesse um erro,
tentamos fazer join em Worker 0. Worker 0 ainda não havia recebido um
erro de recv, então a thread principal ficou bloqueada, esperando Worker 0
terminar. Enquanto isso, Worker 3 recebeu um trabalho e todas as threads
receberam um erro. Quando Worker 0 terminou, a thread principal esperou que o
restante das instâncias Worker terminasse. Nesse ponto, todas já tinham saído
de seus loops e parado.
Parabéns! Agora concluímos nosso projeto: temos um web server básico que usa um thread pool para responder de forma assíncrona. Somos capazes de realizar um desligamento gracioso do servidor, limpando todas as threads do pool.
Aqui está o código completo para referência:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Poderíamos fazer mais aqui! Se você quiser continuar aprimorando este projeto, aqui estão algumas ideias:
- Adicione mais documentação ao
ThreadPoole seus métodos públicos. - Adicione testes de funcionalidade da biblioteca.
- Altere as chamadas para
unwrappara um tratamento de erros mais robusto. - Use
ThreadPoolpara realizar alguma tarefa diferente de atender solicitações da web. - Encontre um crate de thread pool em crates.io e implemente um web server semelhante usando esse crate. Em seguida, compare sua API e robustez com o thread pool que implementamos.
Resumo
Bom trabalho! Você chegou ao final do livro! Queremos agradecer-lhe por juntando-se a nós neste tour pelo Rust. Agora você está pronto para implementar seu próprio Rust projetos e ajudar com projetos de outras pessoas. Tenha em mente que existe um comunidade acolhedora de outros Rustáceos que adorariam ajudá-lo com qualquer desafios que você encontra em sua jornada Rust.