Working Through Rust Web Programming pt 11:Queuing tasks with Redis

Matthew MacFarquhar
6 min readApr 18, 2024

Introduction

In this series, I will be working through this book on Rust Web Programming. The book is quite large and in depth, but in these articles I will be distilling the big picture pieces of the book into bite size tutorials and walk throughs.

In this section, we will be utilizing the skills we have gained using async Rust and Docker to create a web server to queue tasks and spin up workers to execute those tasks using Redis.

The code for this demo is found in this repo https://github.com/mattmacf98/redis-queue-rust

Structuring The Messages

The first thing we will need for a good message queue is a good message structure. We will create our message structs and group them all using an enum called TaskType.

Messages

We have three types of tasks to preform — add, subtract & multiply — so we will need three message structs, below is our add message.

use std::{thread, time};
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AddTask {
pub one: i32,
pub two: i32
}

impl AddTask {
pub fn run(self) -> i32 {
let duration = time::Duration::from_secs(20);
thread::sleep(duration);
return self.one + self.two;
}
}

We need our message to be serializable and de-serializable so that we can turn it into JSON and pop it into the Redis queue, and then pull it from the queue and turn it back into our struct. All our messages will have a run function that our workers will execute.

pub mod add;
pub mod multiply;
pub mod subtract;

use serde::{Serialize, Deserialize};
use crate::tasks::add::AddTask;
use crate::tasks::multiply::MultiplyTask;
use crate::tasks::subtract::SubtractTask;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaskType {
ADD(AddTask),
MULTIPLY(MultiplyTask),
SUBTRACT(SubtractTask)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskMessage {
pub task: TaskType
}

Our TaskType enum contains a value for each type of task, this allows us to maintain a consistent type in our queue (i.e. instead of having a queue of Add, Subtract and Multiply structs, we have a queue of TaskTypes).

Server and Worker

Normally, we might split the code for the server and worker into different main.rs and different compiled executables. However, this is a small project so we will split the logic within the same main.rs using environment variables to dictate if we are a Server or a Worker.

#[tokio::main]
async fn main() {
let app_type = env::var("APP_TYPE").unwrap();

match app_type.as_str() {
"server" => {
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
let server = Server::bind(&addr).serve(make_service_fn(|_conn| {
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
async {
handle(req).await
}
}))
}
}));

if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
},
"worker" => {
let client = redis::Client::open(env::var("REDIS_URL").unwrap()).unwrap();

loop {
let outcome: Option<Vec<u8>> = {
let mut con = client.get_connection().unwrap();

redis::cmd("RPOP")
.arg("some_queue")
.query(&mut con)
.unwrap()
};

match outcome {
Some(data) => {
let deserialized_message: TaskMessage = bincode::deserialize(&data).unwrap();
match deserialized_message.task {
ADD(task) => {
println!("Add: {}", task.run());
},
MULTIPLY(task) => {
println!("Multiply: {}", task.run());
},
SUBTRACT(task) => {
println!("Subtract: {}", task.run());
}
}
}
None => {
println!("Task queue empty");
let five_seconds = time::Duration::from_secs(5);
tokio::time::sleep(five_seconds).await;
}
}
}
},
_ => panic!("{} app type is not supported", app_type)
}
}

We can see that the first thing we do is read an environment variable to determine our app_type (“server” or “worker”).

Worker

Our workers will get a Redis client — using another environment variable to get the Redis url. Then, in a loop, we try to pop a message off of the right end of the Redis queue.

If there are no messages we sleep for 5 seconds.

If there is a message, we match on the TaskType and run the task.

Server

Our server will bind to 0.0.0.0:3000 and then create a service defined in handle.

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncomingBody {
pub one: i32,
pub two: i32
}

async fn handle(req: Request<Body>) -> Result<Response<Body>, &'static str> {
let client = redis::Client::open(env::var("REDIS_URL").unwrap()).unwrap();
let task_type = req.uri().to_string().replace("/", "");

let bytes = body::to_bytes(req.into_body()).await.unwrap();
let response_body: IncomingBody = serde_json::from_slice(&bytes).unwrap();

let message_type: TaskType;
match task_type.as_str() {
"add" => {
let body = AddTask{ one: response_body.one, two: response_body.two };
message_type = TaskType::ADD(body);
},
"multiply" => {
let body = MultiplyTask{ one: response_body.one, two: response_body.two };
message_type = TaskType::MULTIPLY(body);
},
"subtract" => {
let body = SubtractTask{ one: response_body.one, two: response_body.two };
message_type = TaskType::SUBTRACT(body);
}
_ => {
let response = Response::builder().status(StatusCode::NOT_FOUND).body(Body::from("task not found"));
return Ok(response.unwrap());
}
};

let message = TaskMessage{ task: message_type };
let serialized_message = bincode::serialize(&message).unwrap();

let mut con = client.get_connection().unwrap();
let _: () = redis::cmd("LPUSH")
.arg("some_queue")
.arg(serialized_message.clone())
.query(&mut con).unwrap();

Ok(Response::new(Body::from("task sent!")))
}

Our server also connects to Redis, and uses the last path part in the request url to determine the task type to create (so localHost/add creates an add task, localHost/multiply creates a multiply task).

We serialize the incoming body into our IncomingBody struct — which is just two numbers to preform our task on — and we create an appropriate TaskType item. We serialize the TaskType item so that we can push it onto the left side of the Redis queue and we tell the client the task has been added to our queue.

Docker-ize

Our app doesn’t do much currently since there is only one instance of it running at a time and we have no Redis server spun up. We will now use Docker to configure all of this.

Docker File

As always, we will need to create a Docker image for our rust app, so lets create a Dockerfile.

FROM rust:1.74

RUN apt-get update -yqq && apt-get install -yqq cmake g++

COPY . .
WORKDIR .

RUN cargo clean
RUN cargo build --release

RUN cp ./target/release/redis-queue-rust ./redis-queue-rust
RUN rm -rf ./target
RUN rm -rf ./src
RUN chmod +x ./redis-queue-rust

EXPOSE 3000
ENTRYPOINT ["./redis-queue-rust"]

It is super simple, it just compiles our rust app and runs it.

Docker Compose

Our docker-compose.yml will handle our Redis cluster and setting the environment variables to tell our app instances if they are a server or a worker.

version: "3.7"

services:
server_1:
container_name: 'server_1'
image: 'server_1'
build:
context: .
environment:
- 'APP_TYPE=server'
- 'REDIS_URL=redis://redis:6379'
depends_on:
redis:
condition: service_started
restart: on-failure
ports:
- "3000:3000"
expose:
- 3000
worker_1:
container_name: 'worker_1'
image: 'worker_1'
build:
context: .
environment:
- 'APP_TYPE=worker'
- 'REDIS_URL=redis://redis:6379'
depends_on:
redis:
condition: service_started
restart: on-failure
worker_2:
container_name: 'worker_2'
image: 'worker_2'
build:
context: .
environment:
- 'APP_TYPE=worker'
- 'REDIS_URL=redis://redis:6379'
depends_on:
redis:
condition: service_started
restart: on-failure
worker_3:
container_name: 'worker_3'
image: 'worker_3'
build:
context: .
environment:
- 'APP_TYPE=worker'
- 'REDIS_URL=redis://redis:6379'
depends_on:
redis:
condition: service_started
restart: on-failure
redis:
container_name: 'queue-redis'
image: 'redis'
ports:
- '6379:6379'

Our Redis cluster is at the bottom of this compose and pretty simply just spins up our service and accepts traffic at port 6379.

Our Server takes in environment fields to define the REDIS_URL and set the APP_TYPE to server, we then expose port 3000 so we can send in our requests to that port.

Our Workers take in environment fields to define the REDIS_URL and set the APP_TYPE to worker.

With this all set up, we can run docker-compose up and start sending in messages to our server, we can watch the logs showing us that tasks are taken off the queue by our workers and executed!

Conclusion

In this section, we built a working async task system with the help of Redis, Docker and tokio. This demo project is a small capstone to bring together a lot of the technologies we had already explored and fold in async Rust.

This was the last section in the book, and therefore the last section in this series. I enjoyed working through this book a lot and learned a bunch about Rust as well as Docker, AWS, Redis and Postgres. I would HIGHLY recommend working through the book yourself to get a sense of how we can build our own scalable, web based solutions from start to finish.

--

--

Matthew MacFarquhar

I am a software engineer working for Amazon living in SF/NYC.