In the previous post, we wrote a simple Redis Protocol specification (RESP) parser. That’s just a small part towards to build a mini Redis. Let’s continue writing the other parts needed for our mini Redis server.
Here’s how the overall architecture looks like:
Redis CLI <-> Redis Server (TCP) <-> RESP Parser
↓
Key Value Store
We will use redis-cli
as the Redis client and write the following parts:
- Redis Server (TCP)
- Key Value (KV) Store
Here’s the structure of this post:
This post is inspired by Rust Tokio Mini-Redis Tutorial,
where it walks through the reader to implement a mini Redis with
tokio
. This post is part of
the series of implementing mini Redis in Elixir:
- Part 1: Writing a simple Redis Protocol parser in Elixir
- Part 2: Writing a mini Redis server in Elixir
- Part 3: Benchmarking and writing a concurrent mini Redis in Elixir
Prerequisite
Before we get started, if you’re unfamiliar with the following: GenServer
, ETS
and gen_tcp
,
I’ll recommend you to work through the
the official Elixir Guide: Mix and OTP section.
Specifically on the following topics:
We’ll work on top of the implementation of the TCP server from the guide. We will convert it from an echo TCP
server to a Redis TCP server and write a KV store with GenServer
and ETS
.
We will be using redis-cli
as our Redis client. So, make sure you have redis
installed as well. In MacOS, you can install by running:
brew install redis
Setting up our Mix project
First, let’s setup a Mix project and add the necessary files.
mix new mini_redis --sup
We will add our RESP parser code we implemented later as needed.
Writing a key value store with GenServer and ETS
Writing a KV store with ets
wrapped with GenServer
is pretty straightforward.
We will just wrap the following ets
functions around our module:
:ets.lookup/2
:ets.insert/2
:ets.delete/2
Since we don’t want GenServer mailbox to be the bottleneck of our ets
, we expose it through
a normal module function instead of a GenServer callback such as handle_call
and handle_cast
.
We only need to implement the init
callback for our GenServer
.
Here’s how the code in lib/mini_redis/kv.ex
;
defmodule MiniRedis.KV do
use GenServer
require Logger
@table :kv
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get(key) do
case :ets.lookup(@table, key) do
[{^key, value}] -> {:ok, value}
_ -> {:error, :not_found}
end
end
def set(key, value) do
:ets.insert(@table, {key, value})
:ok
end
def delete(key) do
:ets.delete(@table, key)
:ok
end
# GenServer callbacks
@impl true
def init(opts) do
pid = :ets.new(@table, [:set, :named_table, :public])
Logger.info("Starting KV with ETS table #{pid}...")
{:ok, opts}
end
end
It should be pretty much self explanatory and easy to understand since we are just building a wrapper around it.
Why do we need to wrap ets
in a GenServer
module instead of a normal module?
This is because our ets
process is stateful and it need to be owned by a process. Hence,
we will need GenServer
as our parent process for the ets
.
Here’s how the ets
documentation describe it:
Each table is created by a process. When the process terminates, the table is automatically destroyed. Every table has access rights set at creation.
We will need to have our application supervisor start it, let’s
update our code in lib/mini_redis/application.ex
:
children = [
# Starts a worker by calling: MiniRedis.Worker.start_link(arg)
# {MiniRedis.Worker, arg}
+ MiniRedis.KV
]
Our KV store is now done. Let’s start writing our Redis server.
Writing a mini Redis server
A mini Redis server is a TCP server that can parse RESP request and send RESP response. It performs the operation of storing or retrieving key value pairs as requested.
Hence, to write a mini Redis server, it means that we need to:
- Write a basic TCP server
- Support RESP request and response in the TCP server
- Perform write and read of key value pairs according to the request.
Let’s start with the first step.
Writing a TCP server with gen_tcp
We will reused the echo TCP server code from the Elixir official guide on gen_tcp
.
In lib/mini_redis/server.ex
:
defmodule MiniRedis.Server do
require Logger
def accept(port) do
# The options below mean:
#
# 1. `:binary` - receives data as binaries (instead of lists)
# 2. `packet: :line` - receives data line by line
# 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
# 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
#
{:ok, socket} =
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info("Accepting connections on port #{port}")
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
Everything is just an exact copy pasta from Elixir official guides. Next, let’s
add it as the children of our application supervisor, in
lib/mini_redis/application.ex
:
children = [
# Starts a worker by calling: MiniRedis.Worker.start_link(arg)
# {MiniRedis.Worker, arg}
MiniRedis.KV
+ {Task, fn -> MiniRedis.Server.accept(String.to_integer(System.get_env("PORT") || "6379")) end}
]
We can test it by running the following in our terminal:
telnet localhost 6379
and you’ll see the following output in your console:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
you can enter any message and it’ll reply back the same message you send:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
world
world
You can exit telnet by using Ctrl + ]
and type in close
.
Simple enough thanks to the amazing official Elixir guide. Now, let’s make it become a little bit more like a mini Redis server.
Integrating our parser into the TCP server
Now that we have a working TCP server, the next step would be integrating the parser we wrote previously into our TCP server.
Let’s add the parser we have wrote to our current project. In
lib/mini_redis/parser.ex
:
defmodule Parser do
def encode(commands) when is_list(commands) do
result = "*#{length(commands)}\r\n"
Enum.reduce(commands, result, fn command, result ->
result <> "$#{String.length(command)}\r\n#{command}\r\n"
end)
end
def decode(string) when is_binary(string) do
%{commands: commands} =
string
|> String.trim()
|> String.split("\r\n")
|> Enum.reduce(%{}, fn reply, state ->
case reply do
"*" <> length ->
state
|> Map.put(:type, "array")
|> Map.put(:array_length, String.to_integer(length))
"$" <> length ->
state
|> Map.put(:type, "bulk_string")
|> Map.put(:bulk_string_length, String.to_integer(length))
value ->
value = String.trim(value)
Map.update(state, :commands, [value], fn list -> [value | list] end)
end
end)
Enum.reverse(commands)
end
end
This is the first step to make our echo TCP server to become a minimally working Redis server.
Before integrating the parser, let’s recap a bit. Redis client send multiple lines of input as a command to communicate with the Redis server.
The parser we wrote assumed that we will received a full complete lines
of input that can form a command. However, that’s not the case of our TCP
server. Each line is received on its own. This mean that, during the
read_line
our TCP server, we will received the following:
line 1: *3\r\n
line 2: $3\r\n
line 3: SET\r\n
line 4: $5\r\n
line 5: mykey\r\n
line 6: $3\r\n
line 7: foo\r\n
instead of:
line 1: *3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$3\r\nfoo\r\n
To see this in action, we are going to hardcode some implementation for
demonstration purpose. The first step we want to achieve is to return OK
for
every set command that our Redis server received.
Let’s update our helper function to suit our needs:
defp read_line(socket) do
- {:ok, data} = :gen_tcp.recv(socket, 0)
- data
+ :gen_tcp.recv(socket, 0)
end
- defp write_line(line, socket) do
+ defp reply(socket) do
- :gen_tcp.send(socket, line)
+ :gen_tcp.send(socket, "+OK\r\n")
end
Here we rename our write_line
to reply
and hardcoded it to
return +OK\r\n
, which is what is expected by the Redis client on
successful set command.
Next, let’s update our loop_acceptor
and serve
function:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client, 0)
loop_acceptor(socket)
end
# Added a count state, so we could keep track how many lines
# of input we have received so far.
defp serve(socket, count) do
case read_line(socket) do
{:ok, data} ->
count = count + 1
IO.inspect(data, label: "line #{count}")
# Since we know a SET command always have 7 lines,
# we hardcoded this logic for the time being so that
# it reply to the client correctly.
#
# Without doing so, our client will end up being timeout.
if count == 7 do
reply(socket)
serve(socket, 0)
else
serve(socket, count)
end
{:error, reason} ->
Logger.info("Receive error: #{inspect(reason)}")
end
end
We just hardcoded the implementation to stop and reply OK
when we receive 7 parts. Notice
that, this time we also logged the error message if there’s any. This
is important as, once our client receive the response, the connection
will be closed by the client, and result in error.
Let’s see what we got so far:
# In terminal
mix run --no-halt
# In another terminal
redis-cli SET key value
Here’s the output you’ll see:
# Terminal 1
╰─➤ mix run --no-halt
21:38:42.808 [info] Starting KV with ETS table kv...
21:38:42.812 [info] Accepting connections on port 6379
line 1: "*3\r\n"
line 2: "$3\r\n"
line 3: "SET\r\n"
line 4: "$3\r\n"
line 5: "key\r\n"
line 6: "$5\r\n"
line 7: "value\r\n"
21:38:47.785 [info] Receive error: :closed
# Terminal 2
╰─➤ redis-cli SET key value
OK
From here, there are multiple ways we could implement the integration between our parser and our TCP server. Here’s some of the way I could think of:
- Keep track of previous line in our state.
- Keep track of previous parts (line that has been parsed) in our state.
I’ll leave this part as a practice for anyone who are interested to get their hands dirty.
Purposely left blank for those who want to implement themselves
…
…
…
…
…
For the ease of implementation, I’ll go with the first approach, so instead of tracking the count, we will track the previous lines in our state:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client, "")
loop_acceptor(socket)
end
defp serve(socket, state) do
case read_line(socket) do
{:ok, data} ->
# Append the line to the end.
state = state <> data
# Notice that our return value for our parser have changed.
#
# Now we are expecting a tuple, to let us know whether
# the commands is decoded successfully or is still incomplete.
case Parser.decode(state) do
{:ok, commands} ->
IO.inspect(commands, label: "commands")
reply(socket)
serve(socket, "")
{:incomplete, _} ->
serve(socket, state)
end
{:error, reason} ->
Logger.info("Receive error: #{inspect(reason)}")
end
end
Here, we are parsing the line every time a new part of the command is received, until all the parts required arrived to be form a command. In the event of incomplete command, our parser will have to let us know, so our TCP server continue to listen to incoming messages.
Here’s the changes for the parser:
defmodule Parser do
def encode(commands) when is_list(commands) do
# remain the same...
end
def decode(string) when is_binary(string) do
state =
string
|> String.trim()
|> String.split("\r\n")
|> Enum.reduce(%{}, fn reply, state ->
case reply do
"*" <> length ->
state
|> Map.put(:type, "array")
|> Map.put(:array_length, String.to_integer(length))
|> Map.update(:commands, [], fn list -> list end)
"$" <> length ->
state
|> Map.put(:type, "bulk_string")
|> Map.put(:bulk_string_length, String.to_integer(length))
|> Map.update(:commands, [], fn list -> list end)
value ->
value = String.trim(value)
Map.update(state, :commands, [value], fn list -> [value | list] end)
end
end)
if length(state.commands) == state.array_length do
{:ok, Enum.reverse(state.commands)}
else
{:incomplete, state}
end
end
end
Pretty straightforward, we check the expected command length with the commands length we received so far. If it’s the same, it means that we receive all the parts we need for the command and return the commands. Else, we just let the caller know that it’s incomplete.
While this work well, it’s not the most efficient implementation as we are parsing the line every single time on every new incoming new line.
To see our latest progress:
# In terminal
mix run --no-halt
# In another terminal
redis GET key
redis-cli SET key value
Here’s the output of my terminal:
╰─➤ mix run --no-halt
21:51:59.237 [info] Starting KV with ETS table kv...
21:51:59.241 [info] Accepting connections on port 6379
commands: ["GET", "key"]
21:52:00.965 [info] Receive error: :closed
commands: ["SET", "key", "value"]
21:52:05.080 [info] Receive error: :closed
Once we have the commands, the rest is fairly straightforward to integrate. If you’re up to the challenge, try to write the code your own!
Purposely left blank for those who want to implement themselves
…
…
…
…
…
Integrating our Redis Server with KV store
Since we have the commands now, all we need to do is just match our commands to the action we need to call in our KV store.
Let’s first update our reply
function to make suit our need:
defp reply(socket, data) do
:gen_tcp.send(socket, data)
end
This allow us to send different response based on the returned value we get
from our KV store. Next, we’ll implement a handle_command
function:
defp handle_command(socket, command) do
case command do
["SET", key, value] ->
MiniRedis.KV.set(key, value)
reply(socket, "+OK\r\n")
["GET", key] ->
case MiniRedis.KV.get(key) do
{:ok, value} -> reply(socket, "+#{value}\r\n")
{:error, :not_found} -> reply(socket, "$-1\r\n")
end
end
end
Notice that here we return $-1\r\n
to indicate nil
value to our Redis
client, according to the RESP protocol
spec.
Lastly, calling handle_command
in our serve
function:
defp serve(socket, state) do
case read_line(socket) do
{:ok, data} ->
state = state <> data
case Parser.decode(state) do
{:ok, command} ->
handle_command(socket, command)
serve(socket, "")
{:incomplete, _} ->
serve(socket, state)
end
{:error, reason} ->
Logger.info("Receive error: #{inspect(reason)}")
end
end
We have integrated our Redis server with both the RESP parser and our KV store.
It now supports the basic get
and set
commands. Let’s see it in action:
mix run --no-halt
In another terminal:
╭─kai at Kais-MacBook-Pro.local ~
╰─➤ redis-cli SET key value
OK
╭─kai at Kais-MacBook-Pro.local ~
╰─➤ redis-cli GET key
value
╭─kai at Kais-MacBook-Pro.local ~
╰─➤ redis-cli GET unfound
(nil)
Voila, our mini Redis server is done!
What’s next?
We have completed the basic functionality of a Redis server. However, it’s still very far behind from the real Redis server. For example, how does our mini Redis perform against the real Redis?
In the next post, we will find out how well our implementation is doing with synthetic benchmarking. Along the way, we will discovered the limitations of our current implementation, make some changes and tweak some configurations to make it more performant. (Hint: is about concurrency)
Thanks for reading until the end and, hopefully, I can see you in my next post!