Effective Scala

Scala BASE, December 10

Marius Eriksen

Twitter Inc.

Effective Scala

Effective Scala by example

A recent project: finagle-mux. Multiplexing session layer for any RPC protocol.

Small, but nontrivial. Your most recent work is more likely to reflect current best practices.

Caveat emptor

The code I'm going to talk about is going to be in the hot path of nearly all requests to Twitter. It needs to be fast and economical in allocation.

Premature optimization is the root of all evil.

What are we building? The protocol

A simple header, followed by a type-specific message

Message types

Split into T messages and R messages. R messages are replies to T messages.

Treq, Rreq, Tdrain, Rdrain, etc.

Special message: Rerr, indicating some sort of session error.

Modelling messages

package com.twitter.finagle.mux

sealed trait Message {
  val typ: Byte
  val tag: Int
  val buf: ChannelBuffer
}

object Message {
  object Types {
    val Treq = 1: Byte
    val Rreq = ...
}

case class Treq(....) extends Message

Message Treq

case class Treq(tag: Int, traceId: TraceId, req: ChannelBuffer) extends Message {
  val typ = Types.Treq
  lazy val buf = {
    val hd = ChannelBuffers.directBuffer(8+8+8+4)
    hd.writeLong(traceId.spanId.toLong)
    hd.writeLong(traceId.parentId.toLong)
    hd.writeLong(traceId.traceId.toLong)
    hd.writeInt(traceId.flags.toLong.toInt)
    ChannelBuffers.wrappedBuffer(hd, req)
  }
}

"Empty" messages

Some messages are empty, EmptyMessage is a convenience:

abstract class EmptyMessage(val typ: Byte) 
    extends Message
{
  val buf = ChannelBuffers.EMPTY_BUFFER
}

case class Tdrain(tag: Int) 
    extends EmptyMessage(Types.Tdrain)

Message classifiers

In the implementation, it's often convenient to distinguish between T messages and R messages:

object Tmessage {
  def unapply(m: Message): Option[Int] =
    if (m.typ > 0) Some(m.tag)
    else None
}

Which allows pattern matchers:

msg match {
  case Treq(...) =>
  case Tmessage(tag) => // do something with tag

Other classifiers are imaginable..

Decoding messages

// object Message
def decode(buf: ChannelBuffer): Message = {
  val head = buf.readInt()
  val typ = (head>>24 & 0xff).toByte
  val tag = head & 0x00ffffff
  typ match {
    case Types.Treq => decodeTreq(tag, buf)
    case Types.Rreq => decodeRreq(tag, buf)
    case Types.Tdrain => Tdrain(tag)
    case Types.Rdrain => Rdrain(tag)
    ...

Most message are simple: keep it that way. It's okay to not have completely uniform handling. Utility methods and functions are great. Use the succinctness of the language to avoid abstraction.

Encoding messages

Messages already supply buf, do the rest.

def encode(m: Message): ChannelBuffer = {
  val head = Array[Byte](
    m.typ,
    (m.tag>>16 & 0xff).toByte,
    (m.tag>>8 & 0xff).toByte,
    (m.tag & 0xff).toByte
  )

  ChannelBuffers.wrappedBuffer(
    ChannelBuffers.wrappedBuffer(head), m.buf)
}

Keeping track of messages

An implementation must maintain a free set of tags. Assign a tag from this set to a T message, give it back when the corresponding R message arrives.

trait TagSet extends Iterable[Int] {
  /** The range of tags maintained by this TagSet */
  val range: Range
  /** Acquire a tag, if available */
  def acquire(): Option[Int]
  /** Release a previously acquired tag */
  def release(tag: Int)
}

TagSet

Hide the implementation, expose interfaces.

object TagSet {
  /**
   * Constructs a space-efficient TagSet for 
   * the range of available tags in the mux protocol.
   */
  def apply(): TagSet =
    TagSet(Message.MinTag to Message.MaxTag)

  def apply(_range: Range): TagSet = new TagSet { ...

Tags, continued

We also need to have tag maps. These map busy tags to an element.

trait TagMap[T] extends Iterable[(Int, T)] {
  def map(el: T): Option[Int] 
  def maybeRemap(tag: Int, newEl: T): Option[T]
  def unmap(tag: Int): Option[T]
}

TagMap

Prefer composition over inheritance.

object TagMap {
  def apply[T <: Object: ClassManifest](
      set: TagSet, 
      fastSize: Int = 256
  ): TagMap[T] = new TagMap[T] {

This also makes testing much simpler: orthogonal behavior can be tested orthogonally.

TagMap

Use simple datastructures, optimize for the common case. Arrays are nice. Be economical with objects in hot paths.

private[this] val fast = new Array[T](fastSize)
private[this] val fallback = new HashMap[Int, T]
private[this] val fastOff = set.range.start

def map(el: T): Option[Int] = synchronized {
  set.acquire() map { tag =>
    if (tag < fastSize+fastOff)
      fast(tag-fastOff) = el
    else
      fallback.put(tag, el)
    tag
  }
}

The client

class ClientDispatcher(
    trans: Transport[ChannelBuffer, ChannelBuffer]
) extends (ChannelBuffer => Future[ChannelBuffer])

Composition tells the story: given a transport, we get a function with which we can dispatch new messages.

ClientDispatcher

Don't be afraid of functions

private[this] val tags = TagSet()
private[this] val reqs = 
  TagMap[Promise[ChannelBuffer]](tags)
  
private[this] val receive: Message => Unit = {
  case RreqOk(tag, rep) =>
    for (p <- reqs.unmap(tag))
      p.setValue(rep)
  case RreqError(tag, error) =>
    for (p <- reqs.unmap(tag))
      p.setException(ServerApplicationError(error))
...

ClientDispatcher

Recursion is your friend.

private[this] def loop(): Future[Unit] =
  trans.read() flatMap { buf =>
    try {
      val m = decode(buf)
      receive(m)
      loop()
    } catch {
      case exc: BadMessageException =>
        Future.exception(exc)
    }
  }

ClientDispatcher

Handle errors in few places, use composition:

private[this] def loop(): Future[Unit] = ...

loop() onFailure { case exc =>
  trans.close()
  for ((tag, p) <- reqs)
    p.setException(exc)
}

Packages, documentation

Write essays to motivate what you are doing:

package.scala:

package com.twitter.finagle

/**

Package mux implements a generic RPC multiplexer
with a rich protocol. Mux is itself encoding
independent,  so it is meant to use as the
transport for other RPC systems  (eg. thrift).
In OSI terminology, it is a pure session layer.
...

Don't go nuts with formatting. Keep it simple.

Packages, documentation

Look at your scaladoc:

Scaladoc

Look at your scaladoc:

"Scaladoc-driven development:" hide everything that shouldn't be part of the public API. It is much simpler to make something public than it is to make it private.

Effective Scala

Thank you

Marius Eriksen

Twitter Inc.