Systems Programming at Twitter

Facebook, October 30, 2012

Marius Eriksen

Twitter Inc.

(Press space or enter to navigate to the next slide, left arrow to go backwards.)

A history lesson

Twitter evolves

2009: Pure Ruby-on-Rails app with MySQL; lots of memcache. Materialized timelines into memcaches. Social graph moved to a service. Delayed work through queues.
2010: Starting to move timelines out to its own serving system. Project godwit was started to move Twitter off ruby on rails. Work begins on shared serving infrastructure.
2011: Infrastructure matures; a ton of work is being put into porting the application. TFE goes online.
2012: TFE serves 100% of traffic all year; more and more of the API (by traffic) is being served off the new stack. Most new work happens there.

This is the context of our work.

Late 2012 architecture

Many open source components

Organized around services

Multiplexing HTTP frontend

A systems software stack

Programming the datacenter

Concerns include

Taming the resulting complexity is the central theme of our work.

Languages and tools

Mismatch of world views:

Languages and tools target single computers, but our applications demand the simultaneous use of 1000s—a datacenter!

(Most) languages are designed for serial execution, but our world is inherently concurrent

Concurrent systems

Sources of concurrency:

Desiderata

There aren’t (yet?) magic scaling sprinkles. We must program locally, communicate globally.

Clean concurrent programming model: simple, safe, and modular programs; uniform programming model.

Message passing architecture: High concurrency, fault tolerant, performant.

Protocol support: largely agnostic, support HTTP, thrift, memcache, redis, MySQL...

Observability: for diagnostics, profiling, optimization

Detour: a little Scala

Scala

We use Scala heavily for systems work.

I’m going to introduce just enough of the language for you to grasp the following examples.

The point is not the language. The ideas and techniques matter. Scala happens to be our language.

Values

val i: Int = 123
val s: String = "hello, world!"

Static typing with inference

val m = Map(
  1 -> "one",
  2 -> "two"
)

Here, the scala compiler works for us: we’re using integers and strings, so it must be a Map[Int, String]

Functions are values

val f: Int => Int = { x => x*2 }
f(123) == 246

All values are objects

val i = 123
i.toString == "123"
i toString == "123"
i.<(333) == true
i < 333 == true

Scala is a pure object oriented language: every value is an object.

A multitude of method invocation syntaxes.

Values (objects) have methods

case class Stock(ticker: String, price: Double) { 
  def <(other: Stock) = price < other.price
}

val goog = Stock("GOOG", 675.15)
val aapl = Stock("AAPL", 604.00)

aapl < goog == true
goog < aapl == false

Containers are polymorphic

val stocks = Seq(
  Stock("GOOG", 675.15),
  Stock("AAPL", 604.00)
)

Is typed Seq[Stock]. Known as “generics” or “parametric polymorhism.” Not the same as C++ templates.

New trick: type classes

stocks.sorted == Seq(Stock("AAPL", ...

Which works because an ordering is defined on Stock

Pattern matching

An important tool for writing declarative programs.

val newState = state match {
  case Idle => Busy(1)
  case Busy(n) => Busy(n+1)
}

The compiler can provide exhaustiveness checks for us, guaranteeing that functions are total.

Every time you see { case ... } we have a partial function.

Composition

In algebra we learned that two functions g and f compose: h = g·f which is shorthand for h(x) = g(f(x)).

We can do the same in Scala! (pay attention to the types)

val f: Int => String
val g: String => Float
val h: Int => Float = g compose f

g compose f is shorthand for

val h = { x => g(f(x)) }

just like in algebra.

Composition

We took two things ( f, g ) and combine them together to make a new thing ( h ).

We call these widgets combinators.

Other combinators

You probably use these every day.

We can, for instance, “map” collections:

val l = Seq(1,2,3,4)
val mult2 = { x => x*2 }
val l2 = l map mult2 // or: l map { x => x*2 }
l2 == Seq(2,4,6,8)

flatMap

The flatMap combinator is a very versatile tool.

trait Seq[A] {
  def flatMap[B](f: A => Seq[B]): Seq[B]
  ...

As its name suggest, it’s a combination of map and flatten:

def flatMap[B](f: A => Seq[B]) =
  map(f).flatten

flatMap: What can we do with it?

Expanding:

Seq(1,2,3,4) flatMap { x =>
  Seq(x, -x)
} == Seq(1,-2,2,-2,3,-3,4,-4)

Conditionals:

Seq(1,2,3,4) flatMap { x =>
  if (x%2 == 0) Seq(x)
  else Seq()
} == Seq(2,4)

Gratuitous example

flatMap is sufficiently important to have its own syntax sugar. eg. to lazily compute Pythagorean triples:

for {
  z <- Stream.from(1)
  x <- Stream.range(1, z)
  y <- Stream.range(x, z)
  if x*x + y*y == z*z
} yield (x, y, z)

Concurrent programming with Futures

Futures

A placeholder for for a result that is, usually, being computed concurrently

Computations can fail:

Futures are how we represent concurrent execution.

Futures

Are a kind of container:

trait Future[A]

It can be empty, full, or failed. You can wait for it:

val f: Future[A]
val result = f()

Failures would result in exceptions, prefer using Try:

f.get() match {
  case Return(res) => ...
  case Throw(exc) => ...
}

Applying the Hollywood principle

(Don’t call me, I’ll call you!)

val f: Future[String]

f onSuccess { s =>
  log.info(s)
} onFailure { exc =>
  log.error(exc)
}

Promises

Futures are read-only; a Promise is a writable future.

val p: Promise[Int]
val f: Future[Int] = p

Success:

p.setValue(1)

Failure:

p.setException(new MyExc)

Composition: a motivating example

So far, I’ve shown you nothing more than a rephrasing of callbacks. Let’s see how futures compose.

trait Webpage {
  def imageLinks: Seq[String]
  def links: Seq[String]
  ...
}

def fetch(url: String): Future[Webpage]

Let’s build a “Pinterest” style thumbnail extractor.

def getThumbnail(url: String): Future[Webpage]

Thumbnail extractor

def getThumbnail(url: String): Future[Webpage] = {
  val promise = new Promise[Webpage]
  fetch(url) onSuccess { page =>
    fetch(page.imageLinks(0)) onSuccess { p =>
      promise.setValue(p)
    } onFailure { exc =>
      promise.setException(exc)
    }
  } onFailure { exc =>
    promise.setException(exc)
  }
  promise
}

Yuck! This is a variant of the all-too-familiar callback-hell.

flatMap to the rescue?

We want:

def getThumbnail(url: String): Future[Webpage]

We must first fetch the page, find the first image links, then fetch the image link.
If either of these operations fail, getThumbnail also fails.

This is starting to smell like flatMap

trait Future[A] {
  ...
  def flatMap[B](f: A => Future[B]): Future[B]

getThumbnail with flatMap

def getThumbnail(url: String): Future[Webpage] =
  fetch(url) flatMap { page =>
    fetch(page.imageLinks(0))
  }

What about failures?

These should compose as well, and they must be recoverable.

flatMap needs a dual: whereas flatMap operates over values, rescue operates over exceptions

trait Future[A] {
  ...
  def rescue[B](f: Exception => Future[B]): Future[B]

Recovering errors:

val f = fetch(url) rescue {
  case ConnectionFailed =>
    fetch(url)
}

Combining many futures: collect

object Future {
  def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]

Useful for fan-out operations. Eg. to fetch all thumbnails:

def getThumbnails(url: String): Future[Seq[Webpage]] =
  fetch(url) flatMap { page =>
    Future.collect(
      page.imageLinks map { u => fetch(u) }
    )
  }

A simple web crawler*

def crawl(url: String): Future[Seq[Webpage]] =
  fetch(url) flatMap { page =>
    Future.collect(
      page.links map { u => crawl(u) }
    ) map { pps => pps.flatten }
  }

(* Apocryphal)

Functional style

Emphasize “what” over “how”: declare the meaning of a computation; don’t prescribe how it is computed.

Semantics are liberated from mechanics.

Enhances modularity: it’s now simple to alter the implementation without affecting existing code.

Modular decomposition of services

Services

We’ve seen how we can use futures for concurrent programming, now we’ll see how network programming fits into the picture.

What is an RPC?

It’s a function!

type Service[Req, Rep] = Req => Future[Rep]

Servers implement these, clients make use of them.

A simple service

A server:

val multiplier = { i =>
  Future.value(i*2)
}

A client:

multiplier(123) onSuccess { res =>
  println("result", r)
}

Filters

Many common behaviors of services are agnostic to the particulars of the service; some common ones:

Filters compose over services. Conceptually, we want to alter the behavior of a service while being agnostic to what the service is.

Filters: a sketch

val timeout: Filter[…]
val service: Service[Req, Rep]

val serviceWithTimeout =
  timeout andThen service

Filters, too, are just functions:

type Filter[…] =
  (ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]

Example filters

Given a request and a service, dispatch it, but timeout after 1 second.

val timeout = { (req, service) =>
  service(req).within(1.second)
}

Attempt to authenticate the request, dispatching only if it succeeds.

val auth = { (req, service) =>
  if (isAuth(req))
    service(req)
  else
    Future.exception(AuthErr)
}

Filters are stackable

val timeout: Filter[…]
val auth: Filter[…]
val service: Service[…]

timeout andThen auth andThen service

Filters are typesafe

// A service that requires an authenticated request
val service: Service[AuthReq, Rep]

// Bridge with a filter
val filt: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]

// Authenticate, and serve.
val authService: Service[HttpReq, HttpRep] =
  filt andThen service

Finagle: an RPC system

Finagle

A crude recap:

Clients provide services, servers consume them.

Adds behavior, largely configurable: load balancing, connection pooling, retrying, timeouts, rate limiting, monitoring, stats collection.

Protocol agnostic: codecs implement wire protocols.

Manages resources.

Clients

val client = ClientBuilder()
  .name("loadtest")
  .codec(Http)
  .hosts("google.com:80,..")
  .build()

client is a Service[HttpReq, HttpRep]

client(HttpRequest(GET, "/"))

Servers

val service = { req =>
  Future.value(HttpRes(Code.OK, "blah"))
}

ServerBuilder()
  .name("httpd")
  .codec(Http)
  .bindTo(":8080")
  .build(service)

A proxy

val client = ClientBuilder()…
ServerBuilder()…build(client)

Putting it all together

Recently, we wanted to add speculative execution; easy!

val backupReq: Filter[…] = { (req, service) =>
  val reqs = Seq(
    service(req),
    timer.doLater(delay) { service(req)).flatten }
  )

  Future.select(reqs) flatMap {
    case (Return(res), Seq(other)) =>
      other.cancel()
      Future.value(res)
    case (Throw(_), Seq(other)) => other
  }
}

Observability & diagnostics

Observability & diagnostics

In a distributed environment, standard tools loose their efficacy.

It is difficult to reason about what you cannot measure.

Debugging process interaction is vital.

Stats

Liberally export stats that are important for diagnostics, optimization, etc.

Stats.incr("reqs")
Stats.addMetric("latency_ms", 123)

Which are then available:

$ curl host:port/stats.txt 
counters:
  reqs: 35066
  ...
metrics:
  latency_ms: (average=0, count=26610420, maximum=1283, minimum=0, p25=0, p50=0, p75=2, p90=2, p95=2, p99=4, p999=26, p9999=386, sum=25330865)

Viz

Tracing

RPC tracing based on Dapper (Sigelman, Barroso, et. al., 2010)

Trace.record("PC LOAD LETTER")

Trace.timeFuture("search RPC") {
  searchQuery("hello, world!")
}

Tracing support was introduced with zero code changes required in user code.

Zipkin, our trace aggregation system is open source:

Zipkin

Profiling

In situ CPU, heap, contention profilers. Incredibly useful for optimization and diagnostics.

$ curl -O host:port/pprof/profile
$ pprof profile
Welcome to pprof!  For help, type 'help'.
(pprof) top
Total: 19268360 samples
 6824664  35.4%  35.4%  6824664  35.4% Ljava/lang/Object;
 3701808  19.2%  54.6%  3701808  19.2% Lorg/apache/commons/codec/binary/BaseNCodec;resizeBuffer
 3518096  18.3%  72.9%  3518096  18.3% Ljava/util/Arrays;copyOf
 1591152   8.3%  81.1%  1591152   8.3% Ljava/util/Arrays;copyOfRange

pprof --web

In practice: the good

Emphasizing declarative/data flow style programming with future combinators result in robust, modular, safe, and simple systems.

It is simple to build higher level combinators.

Lots of implementor leeway.

In practice: the ugly

Some things don't fit so neatly: cancellation is hairy, for instance, but very important.

Clean separation is sometimes troublesome: eg. how to always retry a request on a different host?

Layering is never actually clean - the world is very messy.

Abstraction results in greater garbage collector pressure, but the JVM is very good.

Aside: thrift

Custom thrift transport with tracing. Soon: request multiplexing, service multiplexing, compression. Upnegotiated, so fully compatible with “legacy” clients/servers.

Thrift code generator: Scrooge

Legacy generator with Future Iface support:

Open source

These are all true open source projects. Lots of external contributions. Tumblr, Foursquare, StumbleUpon have serious deployments of our systems software infrastructure.

Scala school

Effective Scala

Thank you

Marius Eriksen

Twitter Inc.