Sending Bytes

5 Jun, 2021 · #dev#rust

If you ever face a task that involves tricky low-level network communication, you have to consider that network might (and most likely will) partition data that are being sent over.

Let’s say, there is a TCP server that receives messages from clients. When a client connects to the server and pushes some bytes to a socket, if the size of the data is small enough, the whole message will be sent as a single chunk. But if the size of the data is above a certain limit, the message will be split into multiple chunks. So you have to take care of concatenating these chunks on the receiving side.

Consider a client sends two messages — hello and world — one after another. Depending on a network, the server might receive these messages in two chunks, i.e.:

hello
world

Or in four chunks:

hel
lo
wor
ld

In case if data is under your control and you can be sure that a payload wouldn't contain some special character, such as a line feed, then you can use it as a delimiter to indicate the end of a message. I.e. once a receiver encounters \n byte, it means the message is fully received.

But in case if a payload is arbitrary user input, it is not reliable to rely on delimiters of any kind. The solution is to encode a length of a message into a payload sent to a receiving part.

Info
I will be using Rust for code samples, but the pattern should be language agnostic, hence portable to any general-purpose programming language.

When sending a string, it needs to be represented as bytes first:

rust
// "hello".as_bytes()
[104, 101, 108, 108, 111]

The length of this message is 5. To transfer this information to a receiver, we can get a memory representation of this integer in big-endian byte order.

Both parties must agree on a type of this integer. I will elaborate on why in a sec.

Let's pick a u32 as a type for a length. In Rust, a size of 32-bit unsigned integer (u32) is 4 bytes. So the memory representation of 5 of type u32 is:

rust
// 5 as u32
[0, 0, 0, 5]

We can get this representation by calling to_be_bytes method.

Now, we can concatenate these 2 arrays and shape a payload: the bytes that will be sent over the network.

rust
[0, 0, 0, 5, 104, 101, 108, 108, 111]

When the receiver sees new data, it must take the first 4 bytes and convert them back to a 32-bit integer to get the length of the incoming message. This is why the type is so important: 16-bit unsigned integer (u16) would take 2 bytes, so the receiver would need to eject 2 first bytes instead of 4 and use a different function for the conversion.

When the length of the message is known, the receiver keeps reading bytes from the socket into some buffer until the size of the buffer is equal to the length of the message.

Consider the case, when a network splits the message above into two chunks.

rust
// first chunk:
[0, 0, 0, 5, 104, 101]
// length: [0, 0, 0, 5] -> 5
// buffer: [104, 101]

Since the length of the buffer is only 2, the receiver knows that this is only a part of the message, so it waits for the next chunk.

rust
// second chunk:
[108, 108, 111]
// buffer: [104, 101, 108, 108, 111]

Now, the length of the buffer is 5, which means the whole message was received and it can be used for whatever.

##

Show me the code

Implementation of a TCP client-server pair, where the client sends a string to the server and the server prints messages to a stdout.

client.rs
rust
let mut socket = TcpStream::connect("127.0.0.1:7080").await?;
let message = b"hello";
// cast to u32 is critical to get an array of exactly 4 bytes
let message_len = message.len() as u32;
// Getting the memory representation of the message length (u32)
// as a byte array in big-endian (network) byte order
let message_len_bytes = message_len.to_be_bytes();
// Concatenating the message length and the message itself
let mut bytes = message_len_bytes.to_vec();
bytes.extend(message);
// Waiting for the socket to be writable
socket.writable().await?;
// Writing bytes to the socket
socket.write_all(&bytes).await?;

And the server half:

server.rs
rust
let listener = TcpListener::bind("127.0.0.1:7080").await?;
loop {
// New connection
let (socket, addr) = listener.accept().await?;
// Spawning a new task to handle each connection asynchronously
task::spawn(async move {
println!("[{}] new connection", addr);
// Buffer for an incoming message
let mut buf = Vec::new();
// Length of the current message
let mut len = None;
loop {
// Waiting for the socket to be readable
socket.readable().await.unwrap();
match socket.try_read_buf(&mut buf) {
Ok(0) => {
// Ok(0) indicates the stream’s read half is closed
// and will no longer yield data
break;
}
Ok(_) => {
// Some bytes were read and placed in the buffer.
// First, figuring out the length of the whole message.
let message_len = match len {
None => {
// No current length set.
// It means that either this is
// the very first message from this client,
// or the previous message was received
// and `buf` + `len` have been reset.
// Taking first 4 bytes out of the buffer.
// This is the length of the whole message.
let len_bytes = buf
.splice(..4, vec![])
.collect::<Vec<u8>>()
.try_into()
.unwrap();
// Converting these bytes into u32
u32::from_be_bytes(len_bytes)
}
Some(n) => {
// `len` is already set, which means a head
// of the message was already received.
n
}
};
if message_len as usize == buf.len() {
// Buffer length is equal to message length,
// means the whole message has been received
let message = std::str::from_utf8(&buf).unwrap();
// Simply printing it to stdout
println!("[{}] message: {}", addr, message);
// Resetting the buffer and the current length
buf.clear();
len = None;
} else if message_len as usize > buf.len() {
// Buffer length is less then message length,
// means the buffer contains only a part
// of the message
len = Some(message_len);
} else {
// Shouldn't be a case
panic!("Message length < current buffer");
}
}
// If for whatever reason socket is unreadable, retrying
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(err) => panic!("[{}] {}", addr, err),
}
}
});
}

You can poke this code by forking this repo.

Have fun!