Pregunta Cómo esperar varios Futuros


Supongamos que tengo varios futuros y necesito esperar hasta ya sea cualquiera de ellos falla o todos ellos tienen éxito.

Por ejemplo: Deje que haya 3 futuros: f1, f2, f3.

  • Si f1 tiene éxito y f2 falla, no espero f3 (y volver fracaso al cliente).

  • Si f2 falla mientras f1 y f3 todavía funcionan, no los espero (y vuelvo) fracaso)

  • Si f1 tiene éxito y luego f2 tiene éxito, continúo esperando f3.

¿Cómo lo implementarías?


74
2018-04-27 19:54


origen


Respuestas:


En su lugar, puede usar una comprensión forzada de la siguiente manera:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

En este ejemplo, los futuros 1, 2 y 3 se lanzan en paralelo. Luego, en la sección de comprensión, esperamos hasta que los resultados 1 y luego 2 y luego 3 estén disponibles. Si falla 1 o 2, ya no esperamos 3. Si los 3 tienen éxito, entonces el aggFut val tendrá una tupla con 3 espacios, correspondiente a los resultados de los 3 futuros.

Ahora bien, si necesita el comportamiento en el que desea dejar de esperar si dice que fut2 falla primero, las cosas se ponen un poco más complicadas. En el ejemplo anterior, tendría que esperar a que fut1 se complete antes de darse cuenta de que fut2 falló. Para resolver eso, podrías intentar algo como esto:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

Ahora esto funciona correctamente, pero el problema proviene de saber qué Future para eliminar de la Map cuando uno ha sido completado exitosamente Siempre que tenga alguna forma de correlacionar adecuadamente un resultado con el futuro que generó ese resultado, entonces algo como esto funciona. Simplemente sigue retirando futuros completados del mapa y luego llamando Future.firstCompletedOf en el resto Futures hasta que no quede ninguno, recogiendo los resultados en el camino. No es bonito, pero si realmente necesitas el comportamiento del que hablas, entonces esto o algo similar podría funcionar.


72
2018-04-27 23:14



Puede utilizar una promesa y enviarle ya sea la primera falla o el éxito total final completado:

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

Entonces tú puedes Await en ese resultado Future si quieres bloquear, o solo map en otra cosa.

La diferencia con respecto a la comprensión es que aquí se obtiene el error de que el primero falle, mientras que para la comprensión se obtiene el primer error en el orden transversal de la colección de entrada (incluso si otro falló primero). Por ejemplo:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

Y:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)

31
2018-04-27 23:57



Aquí hay una solución sin usar actores.

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}

7
2018-04-27 22:15



Puedes hacer esto solo con futuros. Aquí hay una implementación. Tenga en cuenta que no terminará la ejecución anticipadamente. En ese caso, debe hacer algo más sofisticado (y probablemente implementar la interrupción por su cuenta). Pero si simplemente no quiere seguir esperando algo que no va a funcionar, la clave es seguir esperando que termine lo primero y detenerse cuando ya no quede nada o llegue a una excepción:

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

Aquí hay un ejemplo de esto en acción cuando todo funciona bien:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

Pero cuando algo sale mal

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!

5
2018-04-27 23:12



Para este propósito usaría un actor Akka. A diferencia de la comprensión, falla tan pronto como falla el futuro, por lo que es un poco más eficiente en ese sentido.

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

Luego, crea el actor, envíale un mensaje (para que sepa a dónde enviar su respuesta) y espera una respuesta.

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}

5
2018-04-27 21:28



Esta pregunta ha sido respondida pero estoy publicando mi solución de clase de valor (las clases de valor se agregaron en 2.10) ya que no hay una aquí. Por favor, siéntete libre de criticar.

  implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
    def concurrently = ConcurrentFuture(self)
  }
  case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
    def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
    def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
  }
  def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
    val p = Promise[B]()
    val inner = f(outer.future)
    inner.future onFailure { case t => p.tryFailure(t) }
    outer.future onFailure { case t => p.tryFailure(t) }
    inner.future onSuccess { case b => p.trySuccess(b) }
    ConcurrentFuture(p.future)
  }

ConcurrentFuture es un contenedor de futuros sin límite que cambia el mapa futuro predeterminado / mapa plano de do-this-then-that a combine-all-and-fail-if-any-fail. Uso:

def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }

val f : Future[(Int,String,Double)] = {
  for {
    f1 <- func1.concurrently
    f2 <- func2.concurrently
    f3 <- func3.concurrently
  } yield for {
   v1 <- f1
   v2 <- f2
   v3 <- f3
  } yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }

En el ejemplo anterior, f1, f2 y f3 se ejecutarán simultáneamente y si alguno falla en cualquier orden, el futuro de la tupla fallará inmediatamente.


4
2017-09-17 14:45



Es posible que desee consultar la futura API de Twitter. Notablemente el método Future.collect. Hace exactamente lo que quieres: https://twitter.github.io/scala_school/finagle.html

El código fuente Future.scala está disponible aquí: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala


4
2017-12-13 23:05



Puedes usar esto:

val l = List(1, 6, 8)

val f = l.map{
  i => future {
    println("future " +i)
    Thread.sleep(i* 1000)
    if (i == 12)
      throw new Exception("6 is not legal.")
    i
  }
}

val f1 = Future.sequence(f)

f1 onSuccess{
  case l => {
    logInfo("onSuccess")
    l.foreach(i => {

      logInfo("h : " + i)

    })
  }
}

f1 onFailure{
  case l => {
    logInfo("onFailure")
  }

2
2017-11-24 11:31