Pregunta Java 8 One Stream to Multiple Map


Digamos que tengo un enorme archivo de registro de servidor web que no cabe en la memoria. Necesito transmitir este archivo a un método mapreduce y guardar en la base de datos. Lo hago usando Java 8 stream api. Por ejemplo, obtengo una lista después del proceso mapreduce como consumo por cliente, consumo por ip, consumo por contenido. Pero, mis necesidades no son así como las dadas en mi ejemplo. Como no puedo compartir código, solo quiero dar un ejemplo básico.

Con Java 8 Stream Api, quiero leer el archivo exactamente una vez, obtener 3 listas al mismo tiempo, mientras estoy transmitiendo el archivo, paralelo o secuencial. Pero paralelo sería bueno. ¿Hay alguna forma de hacer eso?


5
2017-08-13 08:34


origen


Respuestas:


Me he adaptado la respuesta a esta pregunta a tu caso El Spliterator personalizado "dividirá" el flujo en múltiples flujos que se recopilan por diferentes propiedades:

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

public static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>         sourceSpliterator;

    private List<BlockingQueue<T>> queues = new ArrayList<>();

    private boolean                sourceDone;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();

        for (Consumer<Stream<T>> fork : consumers)
        {
            LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
            queues.add(queue);
            new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private BlockingQueue<T> queue;

        private ForkedConsumer(BlockingQueue<T> queue)
        {
            super(Long.MAX_VALUE, 0);
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            while (queue.peek() == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(queue.poll());

            return true;
        }
    }
}

Puedes usarlo de la siguiente manera:

streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
                       new Row("content2", "client1", "location1", 2),
                       new Row("content1", "client1", "location2", 3),
                       new Row("content2", "client2", "location2", 4),
                       new Row("content1", "client2", "location2", 5)),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getContent,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))));

// Output
// {client2={location2=9}, client1={location1=3, location2=3}}
// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}
// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}

Tenga en cuenta que puede hacer prácticamente cualquier cosa que desee con sus copias de la transmisión. Según su ejemplo, utilicé un apilado groupingBy colector para agrupar las filas por dos propiedades y luego resumió la propiedad int. Entonces el resultado será un Map<String, Map<String, Integer>>. Pero también podrías usarlo para otros escenarios:

rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))

4
2017-08-13 09:42



En general, la recopilación a cualquier otra cosa que no sea la API estándar es bastante fácil a través de una costumbre Collector. En su caso, reúne 3 listas a la vez (solo un pequeño ejemplo que compila, ya que tampoco puede compartir su código):

private static <T> Collector<T, ?, List<List<T>>> to3Lists() {
    class Acc {

        List<T> left = new ArrayList<>();

        List<T> middle = new ArrayList<>();

        List<T> right = new ArrayList<>();

        List<List<T>> list = Arrays.asList(left, middle, right);

        void add(T elem) {
            // obviously do whatever you want here
            left.add(elem);
            middle.add(elem);
            right.add(elem);
        }

        Acc merge(Acc other) {

            left.addAll(other.left);
            middle.addAll(other.middle);
            right.addAll(other.right);

            return this;
        }

        public List<List<T>> finisher() {
            return list;
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

Y usarlo a través de:

Stream.of(1, 2, 3)
      .collect(to3Lists());

Obviamente, este colector personalizado no hace nada útil, sino solo un ejemplo de cómo podría trabajar con él.


6
2017-08-13 08:58