Developing an efficient binary file protocol with Scodec and Akka Streams - Part 1

  • 2884 words

In this two part series, we will develop our very own file format to store stock prices as time series using Scodec and Akka Streams. The first part will describe the encoding / decoding process and how we improve our binary protocol step by step. In the second part, we will use Akka Streams to read and write our own files in chunks with the help of backpressure.

The complete project can be found here on Github.

The Goal

The goal of this post series is to show the usage and process of developing an efficient binary file protocol to store stock prices and how to implement this with the help of Scodec. The file ending of our file format will be dtff, which stands for dense tick file format.

We will start by using CSV and improve it step by step, expecting around 1 million stock prices in a single file. The assumption we can make is that prices are ordered in our file and they must be stored without any loss of information. Let’s get started by looking at what Scodec is.

What is Scodec

Scodec is a suite of purely functional Scala libraries for working with binary data. It is developed by Michael Pilquist and can be found on Github. The great thing is that the mapping of binary structures to our types is statically verified with the help of shapeless.

To get started with Scodec, we need to add some dependencies and bring them into scope.

libraryDependencies ++= List(
  "org.scodec" %% "scodec-core" % "1.9.0",
  "org.scodec" %% "scodec-bits" % "1.1.0"
)
import scodec._
import bits._
import codecs._

Let’s start by encoding an Int into a BitVector. To do this, we need a Codec[Int]. Fortunately, Scodec comes with a big number of predefined codecs, so we don’t have to implement everything by our self. We can use scodec.codecs.int32 do achieve what we want. Here is how to use it.

scodec.codecs.int32
// res0: scodec.Codec[Int] = 32-bit signed integer

int32 encode 42
// res1: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(32 bits, 0x0000002a))

Since Scodec is purely functional, our attempt to encode our number is represented as scodec.Attempt[scodec.bits.BitVector], which can be either Successful or a Failure. In our case, we had no problem to encode our integer. What we get is a BitVector(32 bits, 0x0000002a). Here are some more examples to give you a feeling how to use Scodec.

// getting the binary representation
(int32 encode 42).getOrElse(BitVector.empty).toBin
// res3: String = 00000000000000000000000000101010

// define a codec for an 8-bit unsigned int followed by an 8-bit unsigned int followed by a 16-bit unsigned int.
val codec = (uint8 ~ uint8 ~ uint16)
// codec: scodec.Codec[((Int, Int), Int)] = ((8-bit unsigned integer, 8-bit unsigned integer), 16-bit unsigned integer)

// using this codec to decode from a from bits
codec.decode(hex"0x2a2a0539".bits)
// res6: scodec.Attempt[scodec.DecodeResult[((Int, Int), Int)]] = Successful(DecodeResult(((42,42),1337),BitVector(empty)))

// using shapeless compile time magic for case class codecs
case class Point(x: Int, y: Int)
// defined class Point

val pointCodec = (int32 :: int32).as[Point]
// pointCodec: scodec.Codec[Point] = scodec.Codec$$anon$2@1b9cc190

Domain Model Definition

Now we know how we can use Scodec, so it’s time to define our domain model. Stock prices are usually represented as tuple of an ask price and a bid price. In simple terms, you buy for the ask price and sell for the bid price. Stock prices are also constantly changing, so we have to specify a timestamp for which the price is valid. Each new price update is called a tick, so our time series represents a series of ticks for an arbitrary stock.

case class Tick(time: Long, bid: Double, ask: Double)

The naive approach with CSV

The most trivial way to encode and store our ticks is using a comma or tab separated file. It is easy to understand and can be used with a lot of applications, which makes it useful for all those Excel evangelists and paper traders out there.

utf8 encode "1420148801108,1.20989,1.21049"
// res8: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(232 bits, 0x313432303134383830313130382c312e32303938392c312e3231303439))

Looking at the output, we can see that one tick is encoded with 232 bits. For our 1 million tick file, we can estimate a 232 * 1000000 / 8 / 1024 / 1024 = 27,65 MB file as a result. We will use this number as our baseline and see, how much we can improve this number.

Implementing our first codec

We will start by defining our first codec for our Tick case class. Our Codec[Tick] is defined as a single 64 bit Long number, followed by two 64-bit big endian IEEE 754 floating point numbers.

val tickCodec = (long(64) :: double :: double).as[Tick]
// tickCodec: scodec.Codec[Tick] = scodec.Codec$$anon$2@4456705

tickCodec encode Tick(1420148801108L, 1.20989, 1.21049)
// res9: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(192 bits, 0x0000014aa776fe543ff35bb59ddc1e793ff35e2ac3222920))

Using our own codec, we only need 192 bits (3 * 64 bits) to encode a single tick, which is only 82% of our CSV approach. Below is the binary representation of our first tick. But we can do way better!

Simple Codec

Switching from Double to Int

The fact that a price is only represented up to a given precision makes it possible for us to eliminate our double values and use plain old integers instead.

Prices can have a precision up to the fifth decimal place. As a consequence, we can use the factor 100.000 to multiply our Double and get an Int without losing information. Let’s define a new version for our ticks as FactorizedTick and some methods to easily switch between them.

val tick = Tick(1420148801108L, 1.20989, 1.21049)
// tick: Tick = Tick(1420148801108,1.20989,1.21049)

case class FactorizedTick(time: Long, bid: Int, ask: Int)
// defined class FactorizedTick

val factorizedTickCodec = (long(64) :: int32 :: int32).as[FactorizedTick]
// factorizedTickCodec: scodec.Codec[FactorizedTick] = scodec.Codec$$anon$2@2bff5f88

implicit final class TickOps(private val wrappedTick: Tick) extends AnyVal {
  def factorize: FactorizedTick =
    FactorizedTick(
      wrappedTick.time,
      (wrappedTick.bid * 100000).toInt,
      (wrappedTick.ask * 100000).toInt
    )
}
// defined class TickOps

factorizedTickCodec encode tick.factorize
// res10: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(128 bits, 0x0000014aa776fe540001d89c0001d8d9))

Using our factorized ticks, we only need 128 bits per tick. For our example file, this means 128 * 1000000 / 8 / 1024 / 1024 = 15,25 MB in total. We can save 45% by using this method in comparison to the CSV version! Again, this is the binary representation of this codec. But that’s not the end.

Better Codec

Using delta encoding and Varints

Instead of storing each tick on its own, we can make use of the fact that we want to save an ordered series of ticks. We only have to encode the first tick as usual. Every subsequent tick can be represented as the difference to the previous one. This method is called Delta encoding.

case class FactorizedDeltaTick(timeDelta: Long, bidDelta: Int, askDelta: Int)
// defined class FactorizedDeltaTick

val factorizedDeltaTickCodec = (long(64) :: int32 :: int32).as[FactorizedDeltaTick]
// factorizedDeltaTickCodec: scodec.Codec[FactorizedDeltaTick] = scodec.Codec$$anon$2@4243eeb5

implicit final class FactorizedTickOps(private val wrappedFactorizedTick: FactorizedTick) extends AnyVal {
  def deltaTo(prevFactorizedTick: FactorizedTick): FactorizedDeltaTick =
    FactorizedDeltaTick(
      wrappedFactorizedTick.time - prevFactorizedTick.time,
      wrappedFactorizedTick.bid - prevFactorizedTick.bid,
      wrappedFactorizedTick.ask - prevFactorizedTick.ask
    )
}
// defined class FactorizedTickOps

val otherTick = Tick(1420148801207L, 1.21004, 1.21063)
// otherTick: Tick = Tick(1420148801207,1.21004,1.21063)

val delta = otherTick.factorize deltaTo tick.factorize
// delta: FactorizedDeltaTick = FactorizedDeltaTick(99,16,14)

factorizedDeltaTickCodec encode delta
// res11: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(128 bits, 0x0000000000000063000000100000000e))

We can see that our delta defines the difference to the original tick, which is 99 milliseconds later, a 16 = 0,00016 higher bid price and a 14 = ,00014 higher ask price. Sadly, we still need 128 bits to encode our delta, since every price difference is represented by 32 bits.

This is where we can use varints or variable length encoded integers. Each byte in a varint, except the last byte, has the most significant bit set. In other terms, the highest bit of each byte encodes whether the next byte belongs to the current number. All other remaining 7 bits are used to hold the value itself. This makes it possible to decode an int with only 1 or up to 5 bytes, depending on the size of the number. Encoding primarily small numbers, using a variable length encoding can drastically save memory consumption. Since price changes are usually small, this makes it perfect for our use case.

Thanks to my friend knutwalker, there is already a vint codec available in Scodec. Here is what it looks like for different numbers.

// saving 75%
int32 encode 42
// res13: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(32 bits, 0x0000002a))

vint encode 42
// res14: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(8 bits, 0x2a))

// saving 50%
int32 encode 1337
// res16: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(32 bits, 0x00000539))

vint encode 1337
// res17: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(16 bits, 0xb90a))

// using the same space
int32 encode 134217728
// res19: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(32 bits, 0x08000000))

vint encode 134217728
// res20: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(32 bits, 0x80808040))

// using 25% more space
int32 encode 2147483647
// res22: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(32 bits, 0x7fffffff))

vint encode 2147483647
// res23: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(40 bits, 0xffffffff07))

As we can see, small numbers are stored very efficiently. The following is the new codec for our FactorizedDeltaTick case class using vint and vlong.

val factorizedDeltaTickCodecV = (vlong :: vint :: vint).as[FactorizedDeltaTick]
// factorizedDeltaTickCodecV: scodec.Codec[FactorizedDeltaTick] = scodec.Codec$$anon$2@781b8436

factorizedDeltaTickCodecV encode delta
// res24: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(24 bits, 0x63100e))

This time, we reduced the memory consumption big time! Only 24 bits are needed to store our delta to the previous tick. Of course this will not work every time. Since we only have 7 of 8 bits available in our vint, we can only represent numbers from 0 to 127 with one byte. When the price or time deltas are greater than 127, we need another byte. The following is the binary representation of our efficient delta tick.

Delta Codec

So varints seem to be the holy grail to store stock prices. The truth is, this is only the case when the price changes upwards. Representing negative numbers, in our case a decreasing price delta, needs exactly 5 bytes. This is because the most significant bit is used to indicate if the next byte is related to the current number, leaving us with only 7 bits remaining for our value. Therefore, 5 bytes are needed to represent negative numbers, even for small numbers.

vint encode -5
// res25: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(40 bits, 0xfbffffff0f))

val negativeDelta = FactorizedDeltaTick(10, -5, -8)
// negativeDelta: FactorizedDeltaTick = FactorizedDeltaTick(10,-5,-8)

factorizedDeltaTickCodec encode negativeDelta
// res26: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(128 bits, 0x000000000000000afffffffbfffffff8))

What a wonderful world would it be when prices would only move upwards. In reality, prices move up and down all the time. Even tough there is another way to store small positive and also negative numbers, which is called ZigZag encoding, we will use a more trivial way to fix this issue. We can use one bit to indicate, whether the next delta is positive (0) or negative (1) and then storing only the absolute value. Here is the case class and the codec for this non-negative delta class.

case class NonNegativeFactorizedDeltaTick(timeDelta: Long, bidDeltaNeg: Boolean, bidDelta: Int, askDeltaNeg: Boolean, askDelta: Int)
// defined class NonNegativeFactorizedDeltaTick

val nonNegFactorizedDeltaTickCodecV = (vlong :: bool :: vint :: bool :: vint).as[NonNegativeFactorizedDeltaTick]
// nonNegFactorizedDeltaTickCodecV: scodec.Codec[NonNegativeFactorizedDeltaTick] = scodec.Codec$$anon$2@77269d0a

implicit class FactorizedDeltaTickOps(private val wrappedFactorizedDeltaTick: FactorizedDeltaTick) extends AnyVal {

  def nonNegative: NonNegativeFactorizedDeltaTick = NonNegativeFactorizedDeltaTick(
    wrappedFactorizedDeltaTick.timeDelta,
    wrappedFactorizedDeltaTick.bidDelta < 0,
    wrappedFactorizedDeltaTick.bidDelta.abs,
    wrappedFactorizedDeltaTick.askDelta < 0,
    wrappedFactorizedDeltaTick.askDelta.abs
  )
}
// defined class FactorizedDeltaTickOps

val nonNegativeDelta = negativeDelta.nonNegative
// nonNegativeDelta: NonNegativeFactorizedDeltaTick = NonNegativeFactorizedDeltaTick(10,true,5,true,8)

nonNegFactorizedDeltaTickCodecV encode nonNegativeDelta
// res27: scodec.Attempt[scodec.bits.BitVector] = Successful(BitVector(26 bits, 0x0a82c20))

Now we’ve got it. Even negative deltas can now be stored very efficiently using varints. We did increase the minimum size from 24 to 26 bits with our two bits as negative indicators, but we decreased the overall storage needed for up and down movements. Expecting evenly distributed up and down moves, we decreased our average storage space from 77 bits (26 bits for positives and 128 bits for negatives) down to just 26 bits! This makes it possible to store 1 million ticks inside an approximately 26 * 1000000 / 8 / 1024 / 1024 = 3,1 MB file, which is a decrease of 88,79% for the total file size in comparison to the CSV approach!

Here is the final binary representation for our codec.

Simple Codec

Conclusion

By developing our own binary protocol to store stock prices as ordered time series, we were able to improve the storage efficiency by nearly 90%. We started with 232 bits for one tick in CSV format and improved the storage needed step by step. The first step was storing integers instead of double values. After that, we switched to delta encoding in combination with variable length encoded integers to make use of the small price changes for each price update. Whenever we have structured data and we care about efficient memory usage, it makes sense to consider building your own binary file protocol by using easy libraries like Scodec to represent your own codec.

In the next part of this series, we will use Akka Streams to build a Source and a Sink to read and write huge files of our own file protocol efficiently with the help of backpressure. The full source code of the final result can be found here on Github.