引言
在Akka的世界中,消息传输是异步的,异步是一种高效的通信机制,使得消息的发送者无须阻塞等待先前的消息处理完成。
异步也意味着不确定性,但是很多时候业务逻辑要求消息按一定的顺序被处理,此时Akka的另一基石 – Future就可以派上用场。
定义
Future和Promise是一对孪生的概念。
一个 Promise 对象代表一个目前还不可用,但是在未来的某个时间点可以被解析的值, 这个值会在未来被Future对象消费。
通过给调用者返回一个Future对象,用户代码不必阻塞等待结果,可以继续向下执行。Future和Promise使得用户可以用同步的方式编写异步代码,同步为表,异步为里,兼具同步代码的清晰性和异步代码的高效性。
在Akka源码中,Future对象的定义为:
1 | object Future { def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T] } |
接收一个函数对象body以及执行上下文execctx作为参数。可以把execctx看作线程池,通常actor中已经包含ExecutionContext类型的默认参数,无须用户代码显示指定。
应用场景
Future有以下典型的应用场景:
- 阻塞IO:典型的如
Sleep函数,可以把相关的代码放入Future块,使actor不必阻塞,可以继续处理后续消息。 - 耗时的计算:典型的如计算PI的值,可以把计算的代码放入
Future, 而不必阻塞等待计算结果。
在Akka中,Future有更多的应用场景:
- 消息之间有顺序依赖。一条消息需要在某条消息之后被处理,由于
Akka的异步处理机制并不保证消息处理的顺序,我们需要借助Future来达成目的。 - 多条消息需要在同一上下文执行。可以通过
Future提供的闭包,来保证多条消息在同一上下文被处理。
使用方法
Akka的Future和其它语言中的一样,支持回调链,待Promise完成计算之后,触发注册的回调函数。
onSuccess,Promise计算成功之后调用的函数。onFailure,Promise计算失败之后调用的函数。onComplete, 无论Promise计算结果如何,都会调用的函数。
Future本质上是单子(monad), 支持一系列高阶函数操作,如map, flatMap 以及filter, 也可以通过for comprehension, 从Future中获得计算结果:
1 | val f = for { |
Future另一个有趣的方法是sequence, 其作用是将List[Future[T]]类型的对象装换成Future[List[T]]类型, 维持了原来List对象各个元素的顺序。
在actor中, 当我们使用ask方法发送消息时,获得的就是一个Future对象,可以基于这个Future对象注册后续操作,例如,可以用pipeTo方法,将Future的结果发送到别的actor:
1 | import akka.pattern.pipe |
代码示例
在红包客户端中,我们需要在发送红包请求之后,接收红包并打开,消息之间存在顺序依赖,于是引入Future, 代码示例:
1 | implicit val askTimeout = Timeout(1.second) |
小结
Akka通过Future提供了对同步机制的支持,在特定的应用场景,Future是非常有力的工具。