akka系统是一个分布式的新闻驱动系统。akka应用由一群卖力差别运算事情的actor组成,每个actor都是被动守候外界的某种新闻来驱动自己的作业。以是,通俗点形貌:akka应用就是一群actor相互之间发送新闻的系统,每个actor吸收到新闻后最先自己卖力的事情。对于akka-typed来说,typed-actor只能吸收指定类型的新闻,以是actor之间的新闻交流需要根据新闻类型来举行,即需要协议来规范新闻交流机制。想想看,若是用户需要一个actor做某件事,他必须用这个actor明了的新闻类型来发送新闻,这就是一种交流协议。

所谓新闻交流方式包罗单向和双向两类。若是涉及两个actor之间的新闻交流,新闻发送方式可以是单向和双向的。但若是是从外界向一个actor发送新闻,那么一定只能是单向的发送方式了,由于新闻发送两头只有一端是actor。

典型的单向新闻发送fire-and-forget如下:

import akka.actor.typed._ import scaladsl._ object Printer { case class PrintMe(message: String) // 只吸收PrintMe类型message
  def apply(): Behavior[PrintMe] = Behaviors.receive { case (context, PrintMe(message)) => context.log.info(message) Behaviors.same } } object FireAndGo extends App { // system就是一个root-actor
val system: ActorRef[Printer.PrintMe] = ActorSystem(Printer(), "fire-and-forget-sample") val printer: ActorRef[Printer.PrintMe] = system // 单向新闻发送,printMe类型的新闻
printer ! Printer.PrintMe("hello") printer ! Printer.PrintMe("world!") system.asInstanceOf[ActorSystem[Printer.PrintMe]].terminate() }

固然,在现实中通常我们要求actor去举行某些运算然后返回运算效果。这就涉及到actor之间双向信息交流了。第一种情形:两个actor之间的新闻是随便无序的,这是一种典型的无顺序request-response模式。就是说一个response纷歧定是根据request的吸收顺序返回的,只是它们之间能够交流而已。不外,在akka-typed中这种模式最基本的要求就是发送的新闻类型必须相符吸收方actor的类型。

好了,我们先对这个模式做个树模。所有actor的界说可以先从它的新闻类型最先。对每个加入双向交流的actor来说,可以从request和response两种新闻来反映它的功效:

object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages } object BackEnd { //先从这个actor的回应新闻最先
     sealed trait Response case class HowAreU(msg: String) extends Response case object Unknown extends Response //可吸收新闻类型
  sealed trait BackMessages //这个replyTo应该是一个能处置Reponse类型新闻的actor
  case class MakeHello(who: String, replyTo: ActorRef[Response]) extends BackMessages }

这个FrontEnd吸收SayHi新闻后最先事情,不外现在还没有界说返回的新闻类型。BackEnd接到MakeHello类型新闻后返回response类型新闻。从这个角度来讲,返回的对方actor必须能够处置Response类型的新闻。

我们试试实现这个FrontEnd actor:

object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = { Behaviors.receive { (ctx,msg) => msg match { case SayHi(who) => ctx.log.info("requested to say hi to {}", who) backEnd ! BackEnd.MakeHello(who, ???) } } }

MakeHello需要一个replyTo,应该是什么呢?不外它一定是可以处置Response类型新闻的actor。但我们知道这个replyTo就是FrontEnd,不外FrontEnd只能处置FrontMessages类型新闻,应该怎么办呢?可不可以把replyTo直接写成FrontEnd呢?虽然可以这么做,但这个MakeHello新闻就只能跟FrontEnd绑死了。若是其它的actor也需要用到这个MakeHello的话就需要另外界说一个了。以是,最好的解决方案就是用某种类型转换方式来实现。如下:

import akka.actor.typed._ import scaladsl._ object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages case class WrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = { Behaviors.setup[FrontMessages] { ctx =>
                                                   //ctx.messageAdapter(ref => WrappedBackEndResonse(ref))
      val backEndRef: ActorRef[BackEnd.Response] = ctx.messageAdapter(WrappedBackEndResonse) Behaviors.receive { (ctx, msg) => msg match { case SayHi(who) => ctx.log.info("requested to say hi to {}", who) backEnd ! BackEnd.MakeHello(who, backEndRef) Behaviors.same //messageAdapter将BackEnd.Response转换成WrappedBackEndResponse
          case WrappedBackEndResonse(msg) => msg match { case BackEnd.HowAreU(msg) => ctx.log.info(msg) Behaviors.same case BackEnd.Unknown => ctx.log.info("Unable to say hello") Behaviors.same } } } } } }

首先,我们用ctx.mesageAdapter产生了ActorRef[BackEnd.Response],正是我们需要提供给MakeHello新闻的replyTo。看看这个messageAdapter函数:

def messageAdapter[U: ClassTag](f: U => T): ActorRef[U]

若是我们举行类型替换U -> BackEnd.Response, T -> FrontMessage 那么:

      val backEndRef: ActorRef[BackEnd.Response] = ctx.messageAdapter((response: BackEnd.Response) => WrappedBackEndResonse(response))

实际上这个messageAdapter函数在内陆ActorContext范围内登记了一个从BackEnd.Response类型到FrontMessages的转换。把吸收到的BackEnd.Response立刻转换成WrappedBackEndResponse(response)。

另有一种两个actor之间的双向交流模式是 1:1 request-response,即一对一模式。一对一的意思是发送方发送新闻后守候回应新闻。这就意味着收信方需要在完成运算义务后立刻向发信方发送回应,否则造成发信方的超时异常。无法制止的是,这种模式依然会涉及新闻类型的转换,如下:

object FrontEnd { sealed trait FrontMessages case class SayHi(who: String) extends FrontMessages case class WrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages case class ErrorResponse(errmsg: String) extends FrontMessages def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages] = { Behaviors.setup[FrontMessages] { ctx =>
      //ask需要超时上限
 import scala.concurrent.duration._ import scala.util._ implicit val timeOut: Timeout = 3.seconds Behaviors.receive[FrontMessages] { (ctx, msg) => msg match { case SayHi(who) => ctx.log.info("requested to say hi to {}", who) ctx.ask(backEnd,(backEndRef: ActorRef[BackEnd.Response]) => BackEnd.MakeHello(who,backEndRef) ){ case Success(backResponse) => WrappedBackEndResonse(backResponse) case Failure(err) =>ErrorResponse(err.getLocalizedMessage) } Behaviors.same case WrappedBackEndResonse(msg) => msg match { case BackEnd.HowAreU(msg) => ctx.log.info(msg) Behaviors.same case BackEnd.Unknown => ctx.log.info("Unable to say hello") Behaviors.same } case ErrorResponse(errmsg) => ctx.log.info("ask error: {}",errmsg) Behaviors.same } } } } }

似乎类型转换是在ask里实现的,看看这个函数:

  def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)( mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

req -> BackEnd.BackMessages, res -> BackEnd.Response, T -> FrontMessages。现在ask可以写成下面这样:

 

 ctx.ask[BackEnd.BackMessages,BackEnd.Response](backEnd, (backEndRef: ActorRef[BackEnd.Response]) => BackEnd.MakeHello(who,backEndRef) ){ case Success(backResponse:BackEnd.Response) => WrappedBackEndResonse(backResponse) case Failure(err) =>ErrorResponse(err.getLocalizedMessage) }

 

这样看起来更明了点,也就是说ask把吸收的BackEnd.Response转换成了FrontEnd处置的新闻类型WrappedBackEndRespnse,也就是FrontMessages

另有一种ask模式是在actor之外举行的,如下:

object AskDemo extends App { import akka.actor.typed.scaladsl.AskPattern._ import scala.concurrent._ import scala.concurrent.duration._ import akka.util._ import scala.util._ implicit val system: ActorSystem[BackEnd.BackMessages] = ActorSystem(BackEnd(), "front-app") // asking someone requires a timeout if the timeout hits without response // the ask is failed with a TimeoutException
  implicit val timeout: Timeout = 3.seconds val result: Future[BackEnd.Response] = system.asInstanceOf[ActorRef[BackEnd.BackMessages]] .ask[BackEnd.Response]((ref: ActorRef[BackEnd.Response]) => BackEnd.MakeHello("John", ref)) // the response callback will be executed on this execution context
  implicit val ec = system.executionContext result.onComplete { case Success(res)  => res match { case BackEnd.HowAreU(msg) => println(msg) case BackEnd.Unknown => println("Unable to say hello") } case Failure(ex)  => println(s"error: ${ex.getMessage}") } system.terminate() }

 这个ask是在akka.actor.typed.scaladsl.AskPattern包里。函数名目如下:

   def ask[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]

 

向ask传入一个函数ActorRef[BackEnd.Response] => BackEnd.BackMessages,然后返回Future[BackEnd.Response]。这个模式中吸收回复方是在ActorContext之外,不存在新闻截获机制,以是不涉及新闻类型的转换。

另一种单actor双向新闻交流模式,即自己ask自己。在ActorContext内向自己发送新闻并提供回应新闻的吸收,如pipeToSelf:

 

object PipeFutureTo { trait CustomerDataAccess { def update(value: Customer): Future[Done] } final case class Customer(id: String, version: Long, name: String, address: String) object CustomerRepository { sealed trait Command final case class Update(value: Customer, replyTo: ActorRef[UpdateResult]) extends Command sealed trait UpdateResult final case class UpdateSuccess(id: String) extends UpdateResult final case class UpdateFailure(id: String, reason: String) extends UpdateResult private final case class WrappedUpdateResult(result: UpdateResult, replyTo: ActorRef[UpdateResult]) extends Command private val MaxOperationsInProgress = 10 def apply(dataAccess: CustomerDataAccess): Behavior[Command] = { Behaviors.setup[Command] { ctx =>
          implicit val dispatcher =  ctx.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher")) next(dataAccess, operationsInProgress = 0) } } private def next(dataAccess: CustomerDataAccess, operationsInProgress: Int)(implicit ec: ExecutionContextExecutor): Behavior[Command] = { Behaviors.receive { (context, command) => command match { case Update(value, replyTo) =>
            if (operationsInProgress == MaxOperationsInProgress) { replyTo ! UpdateFailure(value.id, s"Max $MaxOperationsInProgress concurrent operations supported") Behaviors.same } else { val futureResult = dataAccess.update(value) context.pipeToSelf(futureResult) { // map the Future value to a message, handled by this actor
                case Success(_) => WrappedUpdateResult(UpdateSuccess(value.id), replyTo) case Failure(e) => WrappedUpdateResult(UpdateFailure(value.id, e.getMessage), replyTo) } // increase operationsInProgress counter
              next(dataAccess, operationsInProgress + 1) } case WrappedUpdateResult(result, replyTo) =>
            // send result to original requestor
            replyTo ! result // decrease operationsInProgress counter
            next(dataAccess, operationsInProgress - 1) } } } } }

 

,

Allbet Gaming

www.boyijiaoyu888.com欢迎进入欧博平台网站(Allbet Gaming),Allbet Gaming开放欧博平台网址、欧博注册、欧博APP下载、欧博客户端下载、欧博真人游戏(百家乐)等业务。

声明:该文看法仅代表作者自己,与Sunbet 申博无关。转载请注明:镇江考试考工网:akka-typed(2) - typed-actor交流方式和交流协议
评论关闭

分享到:

焦作旅游网:疫情下外贸商家如何增长?阿里国际站大数据揭示新机遇