Pregunta takeWhile () funciona de manera diferente con flatmap


Estoy creando fragmentos con takeWhile para explorar sus posibilidades. Cuando se utiliza junto con flatMap, el comportamiento no está en línea con la expectativa. Encuentre el fragmento de código a continuación.

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Salida real:

Sample1
Sample2
Sample3
Sample5

Rendimiento esperado:

Sample1
Sample2
Sample3

La razón de la expectativa es que takeWhile debe ejecutarse hasta que la condición en el interior se vuelva verdadera. También he agregado declaraciones impresas dentro de flatmap para la depuración. Las transmisiones se devuelven solo dos veces, lo que está en línea con la expectativa.

Sin embargo, esto funciona bien sin flatmap en la cadena.

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Salida real:

Sample3

Aquí la salida real coincide con la salida esperada.

Descargo de responsabilidad: estos fragmentos son solo para la práctica del código y no sirven para ningún uso válido.

Actualizar:  Error JDK-8193856: fix estará disponible como parte de JDK 10. El cambio será corregir whileOps Sumidero :: aceptar

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

Implementación modificada:

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}

74
2017-12-19 14:10


origen


Respuestas:


Este es un error en JDK 9 - desde problema # 8193856:

takeWhile está asumiendo incorrectamente que una operación ascendente admite y honra la cancelación, lo que lamentablemente no es el caso de flatMap.

Explicación

Si la transmisión está ordenada, takeWhile debe mostrar el comportamiento esperado. Esto no es completamente el caso en su código porque usted usa forEach, que renuncia al orden. Si te importa, lo que haces en este ejemplo, debes usar forEachOrdered en lugar. Lo gracioso: eso no cambia nada.

Entonces, ¿la transmisión no está ordenada en primer lugar? (En ese caso el comportamiento esta bien.) Si crea una variable temporal para la secuencia creada a partir de strArray y verificar si está ordenado ejecutando la expresión ((StatefulOp) stream).isOrdered(); en el punto de interrupción, encontrará que, de hecho, está ordenado:

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

Eso significa que es muy probable que se trate de un error de implementación.

En el código

Como otros han sospechado, ahora también creo que esto podría estar conectado a flatMap estar ansioso Más precisamente, ambos problemas pueden tener la misma causa raíz.

Mirando en la fuente de WhileOps, podemos ver estos métodos:

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}

Este código es utilizado por takeWhile para verificar si un elemento de transmisión dado t si el predicate se ha completado:

  • Si es así, pasa el elemento al downstream operación, en este caso System.out::println.
  • Si no, establece take a falso, por lo que cuando se pregunte la próxima vez si se debe cancelar la interconexión (es decir, si se hace), se devuelve true.

Esto cubre el takeWhile operación. La otra cosa que necesitas saber es que forEachOrdered conduce a la operación terminal que ejecuta el método ReferencePipeline::forEachWithCancel:

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}

Todo lo que hace es:

  1. verificar si la tubería fue cancelada
  2. si no, adelanta el fregadero por un elemento
  3. detener si este fue el último elemento

Se ve prometedor, ¿verdad?

Sin flatMap

En el "buen caso" (sin flatMap; tu segundo ejemplo) forEachWithCancel opera directamente en el WhileOpcomo sink y puedes ver cómo se desarrolla esto:

  • ReferencePipeline::forEachWithCancel hace su ciclo:
    • WhileOps::accept se le da a cada elemento de secuencia
    • WhileOps::cancellationRequested es consultado después de cada elemento
  • en algún momento "Sample4" falla el predicado y se cancela la transmisión

¡Hurra!

Con flatMap

En el "caso malo" (con flatMap; tu primer ejemplo), forEachWithCancel opera en el flatMap operación, sin embargo, que simplemente llama forEachRemaining sobre el ArraySpliterator para {"Sample3", "Sample4", "Sample5"}, que hace esto:

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}

Ignorando todo eso hi y fence cosas, que solo se utiliza si el procesamiento de la matriz se divide para una secuencia paralela, esto es un simple for loop, que pasa cada elemento al takeWhile operación, pero nunca verifica si se cancela. Por lo tanto, navegará ansiosamente a través de todos los elementos en esa "subcorriente" antes de detenerse, probablemente incluso a través del resto de la secuencia.


53
2017-12-19 15:26



Esta es un error sin importar cómo lo mire, y gracias Holger por sus comentarios. No quería poner esta respuesta aquí (¡en serio!), Pero ninguna de las respuestas indica claramente que esto es un error.

La gente dice que esto tiene que ordenarse / no ordenarse, y esto no es cierto ya que esto informará true 3 veces:

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));

También es muy interesante que si lo cambias a:

String[][] strArray = { 
         { "Sample1", "Sample2" }, 
         { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
         { "Sample7", "Sample8" } 
};

entonces Sample7 y Sample8 no será parte del resultado, de lo contrario lo harán. Parece que flatmap  ignora una bandera de cancelación que sería introducida por dropWhile.


20
2017-12-19 18:49



Si miras la documentación para takeWhile:

si se ordena esta secuencia, [devuelve] una secuencia que consiste en   prefijo más largo de los elementos tomados de esta corriente que coinciden con los dados   predicado.

si esta secuencia no está ordenada, [devuelve] una secuencia que consiste en un subconjunto   de elementos tomados de esta corriente que coinciden con el predicado dado.

Su flujo es casualmente ordenado, pero takeWhile no lo hace saber que es. Como tal, está devolviendo la segunda condición: el subconjunto. Tu takeWhile simplemente está actuando como un filter.

Si agrega una llamada a sorted antes de takeWhile, verá el resultado que espera:

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));

10
2017-12-19 14:34



La razón de eso es la flatMap operación también siendo una operaciones intermedias con el cual (uno de) operación intermedia de cortocircuito con estado  takeWhile es usado.

El comportamiento de flatMap como señaló Holger en esta respuesta es sin duda una referencia que no debe perderse para comprender la salida inesperada de tales operaciones de cortocircuito.

El resultado esperado se puede lograr dividiendo estas dos operaciones intermedias introduciendo una operación de terminal para utilizar de manera determinista un flujo ordenado adicional y realizarlo para una muestra como:

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);

Además, parece que hay una relación Error # JDK-8075939 para rastrear este comportamiento ya registrado.

Editar: Esto se puede rastrear aún más en JDK-8193856 aceptado como un error.


10
2017-12-19 14:28