Systems Programming at Twitter
Facebook, October 30, 2012
Marius Eriksen
Twitter Inc.
(Press space or enter to navigate to the next slide, left arrow to go backwards.)
Marius Eriksen
Twitter Inc.
(Press space or enter to navigate to the next slide, left arrow to go backwards.)
2009: Pure Ruby-on-Rails app with MySQL; lots of memcache. Materialized timelines into memcaches. Social graph moved to a service. Delayed work through queues.
2010: Starting to move timelines out to its own serving system. Project godwit was started to move Twitter off ruby on rails. Work begins on shared serving infrastructure.
2011: Infrastructure matures; a ton of work is being put into porting the application. TFE goes online.
2012: TFE serves 100% of traffic all year; more and more of the API (by traffic) is being served off the new stack. Most new work happens there.
This is the context of our work.
Many open source components
Organized around services
Multiplexing HTTP frontend
Concerns include
Taming the resulting complexity is the central theme of our work.
Mismatch of world views:
Languages and tools target single computers, but our applications demand the simultaneous use of 1000s—a datacenter!
(Most) languages are designed for serial execution, but our world is inherently concurrent
Sources of concurrency:
There aren’t (yet?) magic scaling sprinkles. We must program locally, communicate globally.
Clean concurrent programming model: simple, safe, and modular programs; uniform programming model.
Message passing architecture: High concurrency, fault tolerant, performant.
Protocol support: largely agnostic, support HTTP, thrift, memcache, redis, MySQL...
Observability: for diagnostics, profiling, optimization
We use Scala heavily for systems work.
I’m going to introduce just enough of the language for you to grasp the following examples.
The point is not the language. The ideas and techniques matter. Scala happens to be our language.
val i: Int = 123 val s: String = "hello, world!"
val m = Map( 1 -> "one", 2 -> "two" )
Here, the scala compiler works for us: we’re using integers and strings, so it must be a Map[Int, String]
val f: Int => Int = { x => x*2 } f(123) == 246
val i = 123 i.toString == "123" i toString == "123" i.<(333) == true i < 333 == true
Scala is a pure object oriented language: every value is an object.
A multitude of method invocation syntaxes.
case class Stock(ticker: String, price: Double) { def <(other: Stock) = price < other.price } val goog = Stock("GOOG", 675.15) val aapl = Stock("AAPL", 604.00) aapl < goog == true goog < aapl == false
val stocks = Seq( Stock("GOOG", 675.15), Stock("AAPL", 604.00) )
Is typed Seq[Stock]
. Known as “generics” or “parametric polymorhism.” Not the same as C++ templates.
New trick: type classes
stocks.sorted == Seq(Stock("AAPL", ...
Which works because an ordering is defined on Stock
An important tool for writing declarative programs.
val newState = state match { case Idle => Busy(1) case Busy(n) => Busy(n+1) }
The compiler can provide exhaustiveness checks for us, guaranteeing that functions are total.
Every time you see { case ... }
we have a partial function.
In algebra we learned that two functions g and f compose: h = g·f which is shorthand for h(x) = g(f(x)).
We can do the same in Scala! (pay attention to the types)
val f: Int => String val g: String => Float val h: Int => Float = g compose f
g compose f
is shorthand for
val h = { x => g(f(x)) }
just like in algebra.
We took two things ( f, g ) and combine them together to make a new thing ( h ).
We call these widgets combinators.
You probably use these every day.
We can, for instance, “map” collections:
val l = Seq(1,2,3,4) val mult2 = { x => x*2 } val l2 = l map mult2 // or: l map { x => x*2 } l2 == Seq(2,4,6,8)
The flatMap
combinator is a very versatile tool.
trait Seq[A] { def flatMap[B](f: A => Seq[B]): Seq[B] ...
As its name suggest, it’s a combination of map
and flatten
:
def flatMap[B](f: A => Seq[B]) = map(f).flatten
Expanding:
Seq(1,2,3,4) flatMap { x => Seq(x, -x) } == Seq(1,-2,2,-2,3,-3,4,-4)
Conditionals:
Seq(1,2,3,4) flatMap { x => if (x%2 == 0) Seq(x) else Seq() } == Seq(2,4)
flatMap is sufficiently important to have its own syntax sugar. eg. to lazily compute Pythagorean triples:
for { z <- Stream.from(1) x <- Stream.range(1, z) y <- Stream.range(x, z) if x*x + y*y == z*z } yield (x, y, z)
A placeholder for for a result that is, usually, being computed concurrently
Computations can fail:
Futures are how we represent concurrent execution.
Are a kind of container:
trait Future[A]
It can be empty, full, or failed. You can wait for it:
val f: Future[A] val result = f()
Failures would result in exceptions, prefer using Try
:
f.get() match { case Return(res) => ... case Throw(exc) => ... }
(Don’t call me, I’ll call you!)
val f: Future[String] f onSuccess { s => log.info(s) } onFailure { exc => log.error(exc) }
Futures are read-only; a Promise
is a writable future.
val p: Promise[Int] val f: Future[Int] = p
Success:
p.setValue(1)
Failure:
p.setException(new MyExc)
So far, I’ve shown you nothing more than a rephrasing of callbacks. Let’s see how futures compose.
trait Webpage { def imageLinks: Seq[String] def links: Seq[String] ... } def fetch(url: String): Future[Webpage]
Let’s build a “Pinterest” style thumbnail extractor.
def getThumbnail(url: String): Future[Webpage]
def getThumbnail(url: String): Future[Webpage] = { val promise = new Promise[Webpage] fetch(url) onSuccess { page => fetch(page.imageLinks(0)) onSuccess { p => promise.setValue(p) } onFailure { exc => promise.setException(exc) } } onFailure { exc => promise.setException(exc) } promise }
Yuck! This is a variant of the all-too-familiar callback-hell.
We want:
def getThumbnail(url: String): Future[Webpage]
We must first fetch the page, find the first image links, then fetch the image link.
If either of these operations fail, getThumbnail also fails.
This is starting to smell like flatMap
trait Future[A] { ... def flatMap[B](f: A => Future[B]): Future[B]
def getThumbnail(url: String): Future[Webpage] = fetch(url) flatMap { page => fetch(page.imageLinks(0)) }
These should compose as well, and they must be recoverable.
flatMap
needs a dual: whereas flatMap
operates over values, rescue
operates over exceptions
trait Future[A] { ... def rescue[B](f: Exception => Future[B]): Future[B]
Recovering errors:
val f = fetch(url) rescue { case ConnectionFailed => fetch(url) }
object Future { def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
Useful for fan-out operations. Eg. to fetch all thumbnails:
def getThumbnails(url: String): Future[Seq[Webpage]] = fetch(url) flatMap { page => Future.collect( page.imageLinks map { u => fetch(u) } ) }
def crawl(url: String): Future[Seq[Webpage]] = fetch(url) flatMap { page => Future.collect( page.links map { u => crawl(u) } ) map { pps => pps.flatten } }
(* Apocryphal)
Emphasize “what” over “how”: declare the meaning of a computation; don’t prescribe how it is computed.
Semantics are liberated from mechanics.
Enhances modularity: it’s now simple to alter the implementation without affecting existing code.
We’ve seen how we can use futures for concurrent programming, now we’ll see how network programming fits into the picture.
What is an RPC?
It’s a function!
type Service[Req, Rep] = Req => Future[Rep]
Servers implement these, clients make use of them.
A server:
val multiplier = { i => Future.value(i*2) }
A client:
multiplier(123) onSuccess { res => println("result", r) }
Many common behaviors of services are agnostic to the particulars of the service; some common ones:
Filters compose over services. Conceptually, we want to alter the behavior of a service while being agnostic to what the service is.
val timeout: Filter[…] val service: Service[Req, Rep] val serviceWithTimeout = timeout andThen service
Filters, too, are just functions:
type Filter[…] = (ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]
Given a request and a service, dispatch it, but timeout after 1 second.
val timeout = { (req, service) => service(req).within(1.second) }
Attempt to authenticate the request, dispatching only if it succeeds.
val auth = { (req, service) => if (isAuth(req)) service(req) else Future.exception(AuthErr) }
val timeout: Filter[…] val auth: Filter[…] val service: Service[…] timeout andThen auth andThen service
// A service that requires an authenticated request val service: Service[AuthReq, Rep] // Bridge with a filter val filt: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] // Authenticate, and serve. val authService: Service[HttpReq, HttpRep] = filt andThen service
A crude recap:
Clients provide services, servers consume them.
Adds behavior, largely configurable: load balancing, connection pooling, retrying, timeouts, rate limiting, monitoring, stats collection.
Protocol agnostic: codecs implement wire protocols.
Manages resources.
val client = ClientBuilder() .name("loadtest") .codec(Http) .hosts("google.com:80,..") .build()
client
is a Service[HttpReq, HttpRep]
client(HttpRequest(GET, "/"))
val service = { req => Future.value(HttpRes(Code.OK, "blah")) } ServerBuilder() .name("httpd") .codec(Http) .bindTo(":8080") .build(service)
val client = ClientBuilder()… ServerBuilder()…build(client)
Recently, we wanted to add speculative execution; easy!
val backupReq: Filter[…] = { (req, service) => val reqs = Seq( service(req), timer.doLater(delay) { service(req)).flatten } ) Future.select(reqs) flatMap { case (Return(res), Seq(other)) => other.cancel() Future.value(res) case (Throw(_), Seq(other)) => other } }
In a distributed environment, standard tools loose their efficacy.
It is difficult to reason about what you cannot measure.
Debugging process interaction is vital.
Liberally export stats that are important for diagnostics, optimization, etc.
Stats.incr("reqs") Stats.addMetric("latency_ms", 123)
Which are then available:
$ curl host:port/stats.txt counters: reqs: 35066 ... metrics: latency_ms: (average=0, count=26610420, maximum=1283, minimum=0, p25=0, p50=0, p75=2, p90=2, p95=2, p99=4, p999=26, p9999=386, sum=25330865)
RPC tracing based on Dapper (Sigelman, Barroso, et. al., 2010)
Trace.record("PC LOAD LETTER") Trace.timeFuture("search RPC") { searchQuery("hello, world!") }
Tracing support was introduced with zero code changes required in user code.
Zipkin, our trace aggregation system is open source:
In situ CPU, heap, contention profilers. Incredibly useful for optimization and diagnostics.
$ curl -O host:port/pprof/profile $ pprof profile Welcome to pprof! For help, type 'help'. (pprof) top Total: 19268360 samples 6824664 35.4% 35.4% 6824664 35.4% Ljava/lang/Object; 3701808 19.2% 54.6% 3701808 19.2% Lorg/apache/commons/codec/binary/BaseNCodec;resizeBuffer 3518096 18.3% 72.9% 3518096 18.3% Ljava/util/Arrays;copyOf 1591152 8.3% 81.1% 1591152 8.3% Ljava/util/Arrays;copyOfRange
Emphasizing declarative/data flow style programming with future combinators result in robust, modular, safe, and simple systems.
It is simple to build higher level combinators.
Lots of implementor leeway.
Some things don't fit so neatly: cancellation is hairy, for instance, but very important.
Clean separation is sometimes troublesome: eg. how to always retry a request on a different host?
Layering is never actually clean - the world is very messy.
Abstraction results in greater garbage collector pressure, but the JVM is very good.
Custom thrift transport with tracing. Soon: request multiplexing, service multiplexing, compression. Upnegotiated, so fully compatible with “legacy” clients/servers.
Thrift code generator: Scrooge
Legacy generator with Future Iface support:
These are all true open source projects. Lots of external contributions. Tumblr, Foursquare, StumbleUpon have serious deployments of our systems software infrastructure.