"for" for Future on Finagle
for
val f = for { u <- authenticate(request) b <- isBanned(u) } yield (u, b)
As you may know scala's for is coverted into map and flatMap.
val f = authenticate(request).flatMap {u =>
isBanned(u).map {b =>
(u, b)
}
}
So we look into Future#flatMap.
def flatMap[B](f: A => Future[B]): Future[B] = transform({ case Return(v) => f(v) case Throw(t) => Future.rawException(t) })
transform is abstract on Future.
def transform[B](f: Try[A] => Future[B]): Future[B]
Look into classes which extends Future.
class NoFuture extends Future[Nothing] ... def transform[B](f: Try[Nothing] => Future[B]): Future[B] = this
In this case, it returns this.
ConstFuture is already completed.
class ConstFuture[A](result: Try[A]) extends Future[A] { ... def transform[B](f: Try[A] => Future[B]): Future[B] = { val p = new Promise[B] respond({ r => val result = try f(r) catch { case NonFatal(e) => Future.exception(e) } p.become(result) }) p } ... def respond(k: Try[A] => Unit): Future[A] = { val saved = Local.save() Scheduler.submit(new Runnable { def run() { val current = Local.save() Local.restore(saved) try Monitor { k(result) } finally Local.restore(current) } }) this }
Hmm. What is Local? Ah, it's thread local.
We have const value "result" which is result of Future[A].
Inlined transform and respond. Removed Local save/restore.
flatMap for ConstFuture is like this.
def flatMap[B](f: A => Future[B]): Future[B] = val p = new Promise[B] Scheduler.submit(new Runnable { def run() { try Monitor { val newResult = try result match { case Return(v) => f(v) case Throw(t) => Future.rawException(t) } catch { case NonFatal(e) => Future.exception(e) } p.become(newResult) } } }) this p
Okay, so basically it returns Promise[B](f(result of Future[A])).
The point is that f is called on another thread.
I guess on real future result is blocked on the thread and wrapped as Future.