Effective Scala
Scala BASE, December 10
Marius Eriksen
Twitter Inc.
Marius Eriksen
Twitter Inc.
A recent project: finagle-mux
. Multiplexing session layer for any RPC protocol.
Small, but nontrivial. Your most recent work is more likely to reflect current best practices.
The code I'm going to talk about is going to be in the hot path of nearly all requests to Twitter. It needs to be fast and economical in allocation.
Premature optimization is the root of all evil.
A simple header, followed by a type-specific message
Split into T messages and R messages. R messages are replies to T messages.
Treq, Rreq, Tdrain, Rdrain, etc.
Special message: Rerr, indicating some sort of session error.
package com.twitter.finagle.mux sealed trait Message { val typ: Byte val tag: Int val buf: ChannelBuffer } object Message { object Types { val Treq = 1: Byte val Rreq = ... } case class Treq(....) extends Message
case class Treq(tag: Int, traceId: TraceId, req: ChannelBuffer) extends Message { val typ = Types.Treq lazy val buf = { val hd = ChannelBuffers.directBuffer(8+8+8+4) hd.writeLong(traceId.spanId.toLong) hd.writeLong(traceId.parentId.toLong) hd.writeLong(traceId.traceId.toLong) hd.writeInt(traceId.flags.toLong.toInt) ChannelBuffers.wrappedBuffer(hd, req) } }
Some messages are empty, EmptyMessage
is a convenience:
abstract class EmptyMessage(val typ: Byte) extends Message { val buf = ChannelBuffers.EMPTY_BUFFER } case class Tdrain(tag: Int) extends EmptyMessage(Types.Tdrain)
In the implementation, it's often convenient to distinguish between T messages and R messages:
object Tmessage { def unapply(m: Message): Option[Int] = if (m.typ > 0) Some(m.tag) else None }
Which allows pattern matchers:
msg match { case Treq(...) => case Tmessage(tag) => // do something with tag
Other classifiers are imaginable..
// object Message def decode(buf: ChannelBuffer): Message = { val head = buf.readInt() val typ = (head>>24 & 0xff).toByte val tag = head & 0x00ffffff typ match { case Types.Treq => decodeTreq(tag, buf) case Types.Rreq => decodeRreq(tag, buf) case Types.Tdrain => Tdrain(tag) case Types.Rdrain => Rdrain(tag) ...
Most message are simple: keep it that way. It's okay to not have completely uniform handling. Utility methods and functions are great. Use the succinctness of the language to avoid abstraction.
Messages already supply buf
, do the rest.
def encode(m: Message): ChannelBuffer = { val head = Array[Byte]( m.typ, (m.tag>>16 & 0xff).toByte, (m.tag>>8 & 0xff).toByte, (m.tag & 0xff).toByte ) ChannelBuffers.wrappedBuffer( ChannelBuffers.wrappedBuffer(head), m.buf) }
An implementation must maintain a free set of tags. Assign a tag from this set to a T message, give it back when the corresponding R message arrives.
trait TagSet extends Iterable[Int] { /** The range of tags maintained by this TagSet */ val range: Range /** Acquire a tag, if available */ def acquire(): Option[Int] /** Release a previously acquired tag */ def release(tag: Int) }
Hide the implementation, expose interfaces.
object TagSet { /** * Constructs a space-efficient TagSet for * the range of available tags in the mux protocol. */ def apply(): TagSet = TagSet(Message.MinTag to Message.MaxTag) def apply(_range: Range): TagSet = new TagSet { ...
We also need to have tag maps. These map busy tags to an element.
trait TagMap[T] extends Iterable[(Int, T)] { def map(el: T): Option[Int] def maybeRemap(tag: Int, newEl: T): Option[T] def unmap(tag: Int): Option[T] }
Prefer composition over inheritance.
object TagMap { def apply[T <: Object: ClassManifest]( set: TagSet, fastSize: Int = 256 ): TagMap[T] = new TagMap[T] {
This also makes testing much simpler: orthogonal behavior can be tested orthogonally.
Use simple datastructures, optimize for the common case. Arrays are nice. Be economical with objects in hot paths.
private[this] val fast = new Array[T](fastSize) private[this] val fallback = new HashMap[Int, T] private[this] val fastOff = set.range.start def map(el: T): Option[Int] = synchronized { set.acquire() map { tag => if (tag < fastSize+fastOff) fast(tag-fastOff) = el else fallback.put(tag, el) tag } }
class ClientDispatcher( trans: Transport[ChannelBuffer, ChannelBuffer] ) extends (ChannelBuffer => Future[ChannelBuffer])
Composition tells the story: given a transport, we get a function with which we can dispatch new messages.
Don't be afraid of functions
private[this] val tags = TagSet() private[this] val reqs = TagMap[Promise[ChannelBuffer]](tags) private[this] val receive: Message => Unit = { case RreqOk(tag, rep) => for (p <- reqs.unmap(tag)) p.setValue(rep) case RreqError(tag, error) => for (p <- reqs.unmap(tag)) p.setException(ServerApplicationError(error)) ...
Recursion is your friend.
private[this] def loop(): Future[Unit] = trans.read() flatMap { buf => try { val m = decode(buf) receive(m) loop() } catch { case exc: BadMessageException => Future.exception(exc) } }
Handle errors in few places, use composition:
private[this] def loop(): Future[Unit] = ... loop() onFailure { case exc => trans.close() for ((tag, p) <- reqs) p.setException(exc) }
Write essays to motivate what you are doing:
package.scala
:
package com.twitter.finagle /** Package mux implements a generic RPC multiplexer with a rich protocol. Mux is itself encoding independent, so it is meant to use as the transport for other RPC systems (eg. thrift). In OSI terminology, it is a pure session layer. ...
Don't go nuts with formatting. Keep it simple.
Look at your scaladoc:
Look at your scaladoc:
"Scaladoc-driven development:" hide everything that shouldn't be part of the public API. It is much simpler to make something public than it is to make it private.