Akka in Scala Part 7 - Future

引言

Akka的世界中,消息传输是异步的,异步是一种高效的通信机制,使得消息的发送者无须阻塞等待先前的消息处理完成。

异步也意味着不确定性,但是很多时候业务逻辑要求消息按一定的顺序被处理,此时Akka的另一基石 – Future就可以派上用场。

定义

FuturePromise是一对孪生的概念。

一个 Promise 对象代表一个目前还不可用,但是在未来的某个时间点可以被解析的值, 这个值会在未来被Future对象消费。

通过给调用者返回一个Future对象,用户代码不必阻塞等待结果,可以继续向下执行。FuturePromise使得用户可以用同步的方式编写异步代码,同步为表,异步为里,兼具同步代码的清晰性和异步代码的高效性。

Akka源码中,Future对象的定义为:

1
object Future { def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T] }

接收一个函数对象body以及执行上下文execctx作为参数。可以把execctx看作线程池,通常actor中已经包含ExecutionContext类型的默认参数,无须用户代码显示指定。

应用场景

Future有以下典型的应用场景:

  • 阻塞IO:典型的如Sleep函数,可以把相关的代码放入Future块,使actor不必阻塞,可以继续处理后续消息。
  • 耗时的计算:典型的如计算PI的值,可以把计算的代码放入Future, 而不必阻塞等待计算结果。

Akka中,Future有更多的应用场景:

  • 消息之间有顺序依赖。一条消息需要在某条消息之后被处理,由于Akka的异步处理机制并不保证消息处理的顺序,我们需要借助Future来达成目的。
  • 多条消息需要在同一上下文执行。可以通过Future提供的闭包,来保证多条消息在同一上下文被处理。

使用方法

AkkaFuture和其它语言中的一样,支持回调链,待Promise完成计算之后,触发注册的回调函数。

  • onSuccess, Promise计算成功之后调用的函数。
  • onFailurePromise计算失败之后调用的函数。
  • onComplete, 无论Promise计算结果如何,都会调用的函数。

Future本质上是单子(monad), 支持一系列高阶函数操作,如map, flatMap 以及filter, 也可以通过for comprehension, 从Future中获得计算结果:

1
2
3
4
5
6
val f = for {
a <- Future(10 / 2) // 10 / 2 = 5
b <- Future(a + 1) // 5 + 1 = 6
c <- Future(a - 1) // 5 - 1 = 4
if c > 3 // Future.filter
} yield b * c // 6 * 4 = 24

Future另一个有趣的方法是sequence, 其作用是将List[Future[T]]类型的对象装换成Future[List[T]]类型, 维持了原来List对象各个元素的顺序。

actor中, 当我们使用ask方法发送消息时,获得的就是一个Future对象,可以基于这个Future对象注册后续操作,例如,可以用pipeTo方法,将Future的结果发送到别的actor:

1
2
import akka.pattern.pipe
        
val future = someActor ? SomeMessage future pipeTo anotherActor

代码示例

在红包客户端中,我们需要在发送红包请求之后,接收红包并打开,消息之间存在顺序依赖,于是引入Future, 代码示例:

1
2
3
4
5
6
7
8
9
implicit val askTimeout = Timeout(1.second)
def receive = {

// simulate the shake action in Mobile
case Shake =>
(generator ? RedPacket) onSuccess {
case UnopenedRedPacket(amount) => self ! OpenPacket(amount)
}
}

小结

Akka通过Future提供了对同步机制的支持,在特定的应用场景,Future是非常有力的工具。