Working Through Rust Web Programming pt 9: Making a TCP Network App using Async Rust and Actors

Matthew MacFarquhar
9 min readApr 15, 2024

Introduction

In this series, I will be working through this book on Rust Web Programming. The book is quite large and in depth, but in these articles I will be distilling the big picture pieces of the book into bite size tutorials and walk throughs.

In this section, we will be exploring tokio — the premier crate for creating an async Rust runtime — and demonstrating how we can leverage async Rust and the Actor design pattern to build a super-safe, async, multi-threaded TCP network application.

The work for this article is tracked in this repo (for the server), and this repo for the client — which really is just making a request to our server.

Tokio

There are other async Rust runtime crates, however, tokio is the front runner as of writing so that is what the book chooses to follow.

Async Runtime

Rust allows for functions to be async and allows those async functions to .await other async functions. However, Rust itself does not allow our main function to be async. tokio exposes a macro we can use above our main function — #[tokio::main] — to enable our main function to be async.

So now we can do things like

#[tokio::main]
async fn main() {
// assume hello is an async function
let one = hello(1);
let two = hello(2);

let one_out = one.await;
let two_out = two.await;
println!("{} {}", one_out, two_out);
}

All we have done so far is enable calling async functions in our main thread and awaiting their results. These awaits blocks the main thread execution and are not super useful since their logic can probably just be done using synchronous functions.

However, tokio allows us to spin up threads, so multiple executions can be run concurrently and they do not block each other.

#[tokio::main]
async fn main() {
// assume hello is an async function
let one = tokio::spawn({
hello(1)
});
let two = tokio::spawn({
hello(2)
});

let one_out = one.await;
let two_out = two.await;
println!("{} {}", one_out, two_out);
}

using tokio::spawn, we run these async functions in their own thread and then await their completion below, this allows us to run one and two concurrently and speed up our overall process.

Channels

tokio also allows us to create MPSC (multi-producer single consumer) channels, to allow us to pass messages between different threads safely using this queue system.

We can create a channel given a struct for the messages like below

let (tx, rx) = mpsc::channel::<Message>(1);

Then, we simply need to pass the rx (receiver) and tx (sender) into the respective threads responsible for receiving and producing the message.

Actors

Now that we know how to spin up concurrent processes on different threads and safely communicate between them, we can implement a classic async design pattern called the Actor model.

In the Actor model, isolated processes work in their own threads and only communicate through messages, this is where Rust and tokio shine in ensuring our messages are sent and received correctly and in order without the need for us to worry about data races.

We first need to build the struct for the Actors to communicate with each other.

#[derive(Debug)]
pub struct Message {
pub order: String,
pub ticker: String,
pub amount: f32,
pub respond_to: oneshot::Sender<u32>
}

Next, we will create an actor which will produce the message

pub struct BuyOrder {
pub order: String,
pub ticker: String,
pub amount: f32,
pub sender: mpsc::Sender<Message>
}

impl BuyOrder {
pub(crate) fn new(ticker: String, amount: f32, sender: mpsc::Sender<Message>) -> Self {
Self { order: "BUY".to_string(), ticker, amount, sender }
}

pub(crate) async fn send(self) {
let (send, recv) = oneshot::channel();
let message = Message {
order: self.order,
amount: self.amount,
ticker: self.ticker,
respond_to: send
};
let _ = self.sender.send(message).await;
match recv.await {
Ok(outcome) => println!("here is the outcome: {}", outcome),
Err(e) => println!("{}", e)
}
}
}

We can see that this actor simply shoots off a message, waits for a response from the receiver, prints it out and then dies.

We will now need to create an Actor to receive the Message.

pub struct OrderBookActor {
pub receiver: mpsc::Receiver<Message>,
pub sender: mpsc::Sender<TrackerMessage>,
pub total_invested: f32,
pub investment_cap: f32
}

impl OrderBookActor {
pub(crate) fn new(receiver: mpsc::Receiver<Message>, sender: mpsc::Sender<TrackerMessage>, investment_cap: f32) -> Self {
Self { receiver, sender, total_invested: 0.0, investment_cap }
}

async fn handle_message(&mut self, message: Message) {
if message.amount + self.total_invested >= self.investment_cap {
println!("rejecting purchase, total invested: {}", self.total_invested);
let _ = message.respond_to.send(0);
} else {
self.total_invested += message.amount;
println!("processing purchase, total invested: {}", self.total_invested);
let _ = message.respond_to.send(1);

let (send, _) = oneshot::channel();
let tracker_message = TrackerMessage {
command: Order::BUY(message.ticker, message.amount),
respond_to: send,
};

let _ = self.sender.send(tracker_message).await;
}
}

pub(crate) async fn run(mut self) {
println!("actor is running");
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
}
}
}

Our receiver Actor has a receiver mpsc for the Message (ignore the Tracker message for now, in the book this actor also was a producer for another actor which tracked the system status). We run the receiver in a loop– kind of like a server– it is always trying to receive messages on the channel and when it gets a message it will preform the business logic outlined in its handle_message function.

Summary

  • tokio lets us make our main function async
  • tokio lets us spin up threads to run our async functions concurrently
  • tokio allows us to build queues to safely enable passing data between async actors without fear of data races
  • Using the Actor pattern is very useful to enable communication between parallel processes running in separate threads

TCP

Actors and async Rust are pretty cool! But currently, we aren’t really doing anything since there is no process to spin up a BuyActor, let’s expose this action via a TCP listener.

Setting Up the TCP Socket

TCP is the layer that HTTP is built on, if we spin up a TCP listener it will accept any incoming request — not just those that follow the HTTP structure.

#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();

let socket = TcpListener::bind(&addr).await.unwrap();
println!("Listening on: {}", addr);

while let Ok((mut stream, peer)) = socket.accept().await {
println!("Incoming connection from: {}", peer.to_string());
tokio::spawn(async move {
println!("thread {} starting", peer.to_string());
let (reader, mut writer) = stream.split();
let mut buf_reader = BufReader::new(reader);
let mut buf = vec![];

loop {
match buf_reader.read_until(b'\n', &mut buf).await {
Ok(n) => {
...
}
Err(e) => println!("Error receiving message: {}", e)
}
}

println!("thread {} finishing", peer.to_string());
});
}
}

Here, we bind our TCP listener to port 8080 on localhost, and we continually accept traffic that connects to the port. If we get a bite, we will spawn a new thread to handle that client’s call. We can split the socket into a reader (to get the message sent in) and a writer (to send something back out).

Reading and Writing Data to the TCP Stream

Since we are building on top of raw TCP, we cannot expect the message we receive to have a body or header like we can with HTTP. However, this also means we can dream up any protocol we want for enabling clients to talk to our server.

In this example, our protocol for a message will either be

GET\n

or

BUY;AMOUNT;TICKER\n

(where AMOUNT is f32 and TICKER is a String)

This is much lighter weight than a full blown HTTP request and might serve us well if we are just building a network app that communicates amongst itself. Below is the filled out request loop.

loop {
match buf_reader.read_until(b'\n', &mut buf).await {
Ok(n) => {
if n == 0 {
println!("EOF received");
break;
}

let buf_string = String::from_utf8_lossy(&buf);
let data: Vec<String> = buf_string.split(";")
.map(|x| x.to_string().replace("\n", ""))
.collect();
println!("here is the data {:?}", data);
let command = data[0].clone();

match command.as_str() {
"BUY" => {
println!("buy order command processed");
let amount = data[1].parse::<f32>().unwrap();
let order_actor = BuyOrder::new(data[2].clone(), amount, tx_one.clone());
println!("{}: {}", order_actor.ticker, order_actor.amount);
order_actor.send().await;
},
"GET" => {
println!("get order command processed");
let get_actor = GetTrackerActor{ sender: tracker_tx_two.clone() };
let state = get_actor.send().await;
println!("sending back: {:?}", state);
writer.write_all(state.as_bytes()).await.unwrap();
},
_ => {
panic!("{} command not supported", command);
}
}
buf.clear();
}
Err(e) => println!("Error receiving message: {}", e)
}
}

Inside our loop, we will read until we reach a \n indicating that we have consumed an entire message. If we have read no characters, that means the client has no more messages for us and we break out of the loop. If we do have data, we read it into a vector of strings separated by ; where each string is a piece of our request.

If the first piece is GET we just spin off a GET actor, giving it a clone of our TrackerMessage channel Sender. If we have received a BUY command, we also grab the amount and ticker and create a new buy order actor with a Message channel Sender clone.

The BUY request does not return any messages to the client, but our GET request uses our socket to write the state that the get actor receives back to the client.

Actors Handling the Messages

Before we start our server we must make sure to set up our channels and spin up our receiver actors in their own threads, since continually listening to incoming TCP requests is done on the main thread. Below is the complete main.rs

use tokio::net::TcpListener;
use tokio::io::{BufReader, AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::mpsc;

mod actors;
use actors::{OrderBookActor, BuyOrder, Message};
mod order_tracker;
use order_tracker::{TrackerActor, GetTrackerActor, TrackerMessage};

#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();

let socket = TcpListener::bind(&addr).await.unwrap();
println!("Listening on: {}", addr);

let (tx, rx) = mpsc::channel::<Message>(1);
let (tracker_tx, tracker_rx) = mpsc::channel::<TrackerMessage>(1);
let tracker_tx_one = tracker_tx.clone();

tokio::spawn(async {
TrackerActor::new(tracker_rx).run().await;
});
tokio::spawn(async move {
let order_book_actor = OrderBookActor::new(rx, tracker_tx_one.clone(), 20.0);
order_book_actor.run().await;
});
println!("Order book actor running now");

while let Ok((mut stream, peer)) = socket.accept().await {
println!("Incoming connection from: {}", peer.to_string());
let tx_one = tx.clone();
let tracker_tx_two = tracker_tx.clone();
tokio::spawn(async move {
println!("thread {} starting", peer.to_string());
let (reader, mut writer) = stream.split();
let mut buf_reader = BufReader::new(reader);
let mut buf = vec![];

loop {
match buf_reader.read_until(b'\n', &mut buf).await {
Ok(n) => {
if n == 0 {
println!("EOF received");
break;
}

let buf_string = String::from_utf8_lossy(&buf);
let data: Vec<String> = buf_string.split(";")
.map(|x| x.to_string().replace("\n", ""))
.collect();
println!("here is the data {:?}", data);
let command = data[0].clone();

match command.as_str() {
"BUY" => {
println!("buy order command processed");
let amount = data[1].parse::<f32>().unwrap();
let order_actor = BuyOrder::new(data[2].clone(), amount, tx_one.clone());
println!("{}: {}", order_actor.ticker, order_actor.amount);
order_actor.send().await;
},
"GET" => {
println!("get order command processed");
let get_actor = GetTrackerActor{ sender: tracker_tx_two.clone() };
let state = get_actor.send().await;
println!("sending back: {:?}", state);
writer.write_all(state.as_bytes()).await.unwrap();
},
_ => {
panic!("{} command not supported", command);
}
}
buf.clear();
}
Err(e) => println!("Error receiving message: {}", e)
}
}

println!("thread {} finishing", peer.to_string());
});
}
}

Our TrackerActor and our OrderBookActor are spun up in their own threads before we block and listen on the TCP address. The flow of our application now looks like this…

BUYING
1. A request comes into our TCP server to buy a stock
2. We create a BuyOrderActor (which has the ability to Send
to the Message queue) – which it does.
3. OrderBookActor reponds to the sent Message and updates its internal state
with our newly bought stock
4. OrderBookActor uses its TrackerMessage Sender to send a message that
a BUY has occured
5. Tracker Actor receives the TrackerMessage and updates its internal state with
the new info

GETTING
1. A request comes into our TCP server to GET the status of our portfolio
2. We create a GetTrackerActor, which then send a TrackerMessage with
command GET and a oneShot channel to be repsonded to
3. Tracker Actor receives the TrackerMessage and sends its state back to the
GetTrackerActor using the passed in oneshot sender
4. TCP server receives the state from the GetTrackerActor (which it has been awaiting on)
and writes the state back to the stream for the original TCP caller to see.

Why did we split tracking the state and buying stocks into two actors? Mostly, it was just for practice and demonstrating how we can craft complex message patterns. But, it also enables us to split traffic nicely to not overwhelm one queue (i.e. gets are placed on one queue without interrupting our core requirement of intaking our BUY requests on our other queue).

Now we try out our system by running this client code a whole bunch of times.

use tokio::net::TcpStream;
use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
let (reader, mut writer) = stream.split();
println!("stream starting");

writer.write_all(b"BUY;8.0;BYND\nBUY;9.0;PLTR\nBUY;9.0;PLTR\nGET\n").await?;
println!("sent data");

let mut buf_reader = BufReader::new(reader);
let mut buf = vec![];
println!("reading data");
let _ = buf_reader.read_until(b'\n', &mut buf).await.unwrap();
let state_string = String::from_utf8_lossy(&buf);
println!("{}", state_string);
Ok(())
}

Summary

  • We can create a TCP listener to read and write raw data without HTTP
  • We can define our own protocol on top of TCP simply by sending and expecting requests to follow a certain structure
  • We can spin up Actors in their own threads and use the MPSC channels from tokio to facilitate routing our TCP traffic to each of the Actors running their own processes
  • We can enable complex messaging patterns by creating more channels and giving the Sender and Receiver handles of those channels to the required Actors

Conclusion

We have seen how useful tokio is in creating worry free async processes, we have also explored how we can build on top of raw TCP and define our own protocols. Lastly, we brought all of this together, creating a lightweight, async network app which takes in TCP requests and sends messages to the correct Actor for processing — all in an async manner. In the next section, we will be building an HTTP layer on top of TCP and creating highly customizable HTTP based Actors using the hyper framework.

--

--

Matthew MacFarquhar

I am a software engineer working for Amazon living in SF/NYC.