Working Through Rust Web Programming pt 10: Building HTTP Actors

Matthew MacFarquhar
7 min readApr 16, 2024

Introduction

In this series, I will be working through this book on Rust Web Programming. The book is quite large and in depth, but in these articles I will be distilling the big picture pieces of the book into bite size tutorials and walk throughs.

In this section, we will be delving into implementing our own HTTP server on top of TCP and then fleshing out a custom HTTP Actor service to have a side car process working to persist data sent into a chat app we will build.

All the work for this article are contained in these two repos

Creating Our Own HTTP Frame

HTTP is built on top of TCP’s plain text reading and writing by specifying an agreed upon format which the client and server know ahead of time. We can build a very small piece of the HTTP protocol ourselves!

Frames

We will build our HTTP layer as a frame, framing allows us to break a stream of data into manageable chunks for transmission across a network, this is similar to what we did manually in our TCP application where every \n was a new message. Rust has some libraries to make this a little cleaner. First, we will need some structs to define our frame.

use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct HttpFrame {
pub header: Header,
pub body: Body
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Header {
pub method: String,
pub uri: String
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Body {
pub ticker: String,
pub amount: f32
}

Our HTTP frame has some familiar pieces, a Body and a Header, defining these structs and making them serializable and de-serializable is the first step in making our own HTTP protocol handler.

Sending the HTTP Request

In our client, we will send the HTTP request likes so

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
let mut framed = BytesCodec::new().framed(stream);

let message = HttpFrame {
header: Header { method: "POST".to_string(), uri: "www.my_test_stock_app.com/stock/purchase".to_string() },
body: Body { ticker: "BYND".to_string(), amount: 3.2 },
};

let message_bin = bincode::serialize(&message).unwrap();
let sending_message = Bytes::from(message_bin);
framed.send(sending_message).await.unwrap();

let message = framed.next().await.unwrap().unwrap();
let message = bincode::deserialize::<HttpFrame>(&message).unwrap();
println!("{:?}", message);

Ok(())
}

We first connect to our server and establish our framing, we create our custom HTTP Frame and serialize it into a sequence of bytes which we can send over the network. We then deserialize the server response and print it out.

Receiving the HTTP Request

Now, on our server side, we process the HTTP request like so

#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();

let listener = TcpListener::bind(&addr).await.unwrap();
println!("listening on {}", addr);

loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let mut framed = BytesCodec::new().framed(socket);
let message = framed.next().await.unwrap();

match message {
Ok(bytes) => {
let message = bincode::deserialize::<HttpFrame>(&bytes).unwrap();
println!("{:?}", message);
let message_bin = bincode::serialize(&message).unwrap();
let sending_message = Bytes::from(message_bin);
framed.send(sending_message).await.unwrap();
},
Err(err) => println!("Socket closed with error {:?}", err)
}

println!("Socket received FIN packet and closed connection");
});
}
}

We spin up a server to accept requests, when we get a request we frame it and extract the message. We then deserialize the message, print it and serialize it again so we can send it back out to the client.

Summary

  • We can define our own Frame on top of TCP simply by creating some serializable/de-serializable structs to represent its structure
  • We can use Frames to easily chunk stream content into sections that match a Frame struct we define
  • When we receive a message we must first de-serialize it, when we send a message we must first serialize it

An HTTP Actor System Using Hyper

Using what we learned about async Rust Actors and TCP in the last section and what we just went over about HTTP, we are in a pretty good state to create our own HTTP based application. We can create a server that takes in HTTP requests containing chat messages and sends them to a State Actor maintaining the state of our chat logs. We will also have another HTTP based Runner Actor which will be polling the State Actor periodically to clear its state and send it to some other server for long term storage.

While we did successfully build some of HTTP’s functionality ourselves above, it is usually better to use an established library which has all the features of HTTP we may want, we will use hyper.

Messages

First we will need to define the messages that the Actors will be sending in our application.

use serde::Serialize;
use std::env;

#[derive(Debug, Serialize)]
pub enum MessageType {
INPUT,
OUTPUT,
EMPTY
}

#[derive(Debug, Serialize)]
struct PostBody {
pub chat_id: i32,
pub block_data: String
}

#[derive(Debug, Serialize)]
pub struct StateActorMessage {
pub message_type: MessageType,
pub chat_id: Option<i32>,
pub single_data: Option<String>,
pub block_data: Option<Vec<String>>
}

impl StateActorMessage {
pub async fn send_to_server(&self) {
let lib_url = env::var("SERVER_URL").unwrap();
let joined = self.block_data.clone().unwrap().join("$");
let body = PostBody {
chat_id: self.chat_id.unwrap(),
block_data: joined
};
let client = reqwest::Client::new();
let res = client.post(lib_url)
.json(&body)
.send()
.await.unwrap();
println!("{:?}", res);
}
}

We have three message types that can be sent around

  • EMPTY: is sent by the State Actor to the Runner Actor to tell it that there are no messages in the state to be stored
  • INPUT: is sent by the HTTP server to the State Actor to tell it to update the chat state with a new message
  • OUTPUT: is sent from the Runner Actor to the State Actor to request its state, and then from the State Actor to the Runner Actor to return the state — if it is not empty

We definitely could have split these up into separate messages if we wanted more clear separation of processes, but the project is small enough that this approach will work fine.

We also put the send_to_server function in the message itself as opposed to creating the functionality in the Runner Actor. This may be good if multiple Actors wanted to send the message to the server, but I think in an actual production project I would just put this logic in the Runner Actor’s logic.

Actors

We have the messages, now we need to configure some Actors, the implementations for the State Actor and the Runner Actor dictate the message flow.

Essentially the flows fall under two categories

INSERT DATA
1. HTTP Server tells State Actor to INPUT data into its state with a
chat_id and a single message
2. State Actor processes the request and updates its state

PERSIST DATA
1. Runner Actor sends OUTPUT message with all empty fields
2. State Actor receives OUTPUT message
a. EITHER The state is empty => State Actor repsonds with an EMPTY message
b. OR There is some state => State Actor gets the oldest chat in the chat queue
and returns all the chat messages in an OUTPUT message and removes the chat
from its local state
3. Runner Actor receives message
a. EMPTY => do nothing
b. OUTPUT => send the message to the persistance Server

Create our HTTP Server

Now we can set up our HTTP server so we can start up this process.

use tokio::sync::{mpsc, mpsc::Sender};
use hyper::{Body, Request, Response, Server};
use hyper::body;
use hyper::service::{make_service_fn, service_fn};
use serde_json;
use serde::Deserialize;
use std::net::SocketAddr;

mod actors;
use actors::state::StateActor;
use actors::runner::RunnerActor;
use actors::messages::StateActorMessage;
use actors::messages::MessageType;

#[derive(Deserialize, Debug)]
struct IncomingBody {
pub chat_id: i32,
pub timestamp: i32,
pub input: String,
pub output: String
}

async fn handle(req: Request<Body>, channel_sender: Sender<StateActorMessage>) -> Result<Response<Body>, String> {
println!("incoming message from the outside");
let method = req.method().clone();
println!("{}", method);
let uri = req.uri();
println!("{}", uri);

let bytes = body::to_bytes(req.into_body()).await.unwrap();
let string_body = String::from_utf8(bytes.to_vec()).expect("expected valid utf-8 in body");
let value: IncomingBody = serde_json::from_str(string_body.as_str()).unwrap();

let message = StateActorMessage {
message_type: MessageType::INPUT,
chat_id: Some(value.chat_id),
single_data: Some(format!("{}>>{}>>{}>>", value.input, value.output, value.timestamp)),
block_data: None,
};

channel_sender.send(message).await.unwrap();

Ok(Response::new(format!("{:?}", value).into()))
}

#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([0,0,0,0], 3000));
let (state_tx, state_rx) = mpsc::channel::<StateActorMessage>(1);
let (runner_tx, runner_rx) = mpsc::channel::<StateActorMessage>(1);
let channel_sender = state_tx.clone();

tokio::spawn(async move {
let state_actor = StateActor::new(state_rx, runner_tx);
state_actor.run().await;
});

tokio::spawn(async move {
let runner_actor = RunnerActor::new(30, runner_rx, state_tx);
runner_actor.run().await;
});

let sever = Server::bind(&addr).serve(make_service_fn(|_conn| {
let channel = channel_sender.clone();
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
let channel = channel.clone();
async {
handle(req, channel).await
}
}))
}
}));

if let Err(e) = sever.await {
eprintln!("server error: {}", e);
}
}

First, we set up our server address and spin up our Actors in their own threads. We can see our Runner Actor takes in an int for the time it should wait between its polls on the State Actor. Then we will configure our hyper service to call handle when an HTTP request is passed in.

handle will extract the HTTP request’s body and use the values provided to create a StateActorMessage to send to our State Actor.

Summary

  • We can create messages for our Actors to communicate
  • We can develop our own custom messaging patterns and processes to be run on our Actors
  • We can easily intake HTTP requests and extract the body of the requests using the light weight hyper framework

Conclusion

In this section we investigated what HTTP really is and discovered that it is just an agreed upon message structure sent over TCP. We then used our previous expertise in tokio and the Actor design pattern to create a working HTTP application for a chat app with a side car mechanism for persisting chats from cache into some other server. In the next section, we will be applying our newfound understanding of async Rust to augment our web app and enable us to queue tasks with Redis.

--

--

Matthew MacFarquhar

I am a software engineer working for Amazon living in SF/NYC.