Pregunta Ejecutando múltiples hilos en la cola usando BlockingCollections


Mi programa tiene 3 funciones. Cada función toma una lista de elementos y completa cierta información. Por ejemplo

class Item {
 String sku,upc,competitorName;
 double price;
}

función F1 toma una lista y llena upc

función F2 toma List (salida de F1) y llena precio.

función F3 toma List (salida de F2) y llena competitorName

F1 puede procesar 5 artículos a la vez, F2 puede procesar 20 artículos a la vez, F3 también 20.

En este momento estoy ejecutando F1 -> F2 -> F3 en serie porque F2 necesita información (código UPC) de F1. F3 necesita precio de F2.

Me gustaría hacer que este proceso sea eficiente ejecutando F1 funcionando continuamente en lugar de esperar a que F2 y F3 se completen. F1 se ejecuta y sale a la cola, luego F2 toma 20 elementos a la vez y los procesa. y luego sigue F3.

¿Cómo puedo lograr esto utilizando BlockingCollection y Queue?


6
2017-09-08 16:26


origen


Respuestas:


Este es un caso de uso típico de Tormenta Apache en caso de que haya elementos continuos entrando a la F1. Puede implementar esto en Storm en cuestión de minutos y tendrá un sistema rápido y perfectamente paralelo en su lugar. Su F1, F2 y F3 se convertirán en pernos y su productor de Artículos se convertirá en surtidor.

Ya que preguntaste cómo hacerlo usando BlockingCollections aquí hay una implementación. Necesitarás 3 hilos en total.

ArtículosProductor: Está produciendo 5 artículos a la vez y alimentándolo a la F1.

F2ExecutorRed: Está consumiendo 20 artículos a la vez y alimentándolo a F2.

F3ExecutorRed: Está consumiendo 20 artículos a la vez y alimentándolo a F3.

También tiene 2 colas de bloqueo, una se usa para transferir datos de F1-> F2 y una de F2-> F3. También puede tener una cola para enviar datos a F1 de manera similar si es necesario. Depende de cómo está obteniendo los artículos. He usado Thread.sleep para simular el tiempo requerido para ejecutar la función.

Cada función seguirá buscando elementos en la cola asignada, independientemente de lo que estén haciendo otras funciones, y espere hasta que la cola tenga elementos. Una vez que hayan procesado el elemento lo colocarán en otra cola para otra función. Esperarán hasta que la otra cola tenga espacio si está llena.

Dado que todas sus funciones se ejecutan en diferentes subprocesos, F1 no estará esperando a que termine F2 o F3. Si tus F2 y F3 son significativamente más rápidos que F1, puedes asignar más hilos a la F1 y seguir presionando la misma F2Queue.

public class App {

    final BlockingQueue<Item> f2Queue = new ArrayBlockingQueue<>(100);
    final BlockingQueue<Item> f3Queue = new ArrayBlockingQueue<>(100);

    public static void main(String[] args) throws InterruptedException {
        App app = new App();
        app.start();
    }

    public void start() throws InterruptedException {
        Thread t1 = new ItemsProducer(f2Queue);
        Thread t2 = new F2ExecutorThread(f2Queue, f3Queue);
        Thread t3 = new F3ExecutorThread(f3Queue);

        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();
    }
}

/**
 * Thread producing 5 items at a time and feeding it to f1()
 */
class ItemsProducer extends Thread {
    private BlockingQueue<Item> f2Queue;

    private static final int F1_BATCH_SIZE = 5;

    public ItemsProducer(BlockingQueue<Item> f2Queue) {
        this.f2Queue = f2Queue;
    }

    public void run() {
        Random random = new Random();
        while (true) {
            try {
                List<Item> items = new ArrayList<>();
                for (int i = 0; i < F1_BATCH_SIZE; i++) {
                    Item item = new Item(String.valueOf(random.nextInt(100)));
                    Thread.sleep(20);
                    items.add(item);
                    System.out.println("Item produced: " + item);
                }

                // Feed items to f1
                f1(items);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    void f1(List<Item> items) throws InterruptedException {
        Random random = new Random();
        for (Item item : items) {
            Thread.sleep(100);
            item.upc = String.valueOf(random.nextInt(100));
            f2Queue.put(item);
        }
    }
}

/**
 * Thread consuming items produced by f1(). It takes 20 items at a time, but if they are not
 * available it waits and starts processesing as soon as one gets available
 */
class F2ExecutorThread extends Thread {
    static final int F2_BATCH_SIZE = 20;
    private BlockingQueue<Item> f2Queue;
    private BlockingQueue<Item> f3Queue;

    public F2ExecutorThread(BlockingQueue<Item> f2Queue, BlockingQueue<Item> f3Queue) {
        this.f2Queue = f2Queue;
        this.f3Queue = f3Queue;
    }

    public void run() {
        try {
            List<Item> items = new ArrayList<>();
            while (true) {
                items.clear();
                if (f2Queue.drainTo(items, F2_BATCH_SIZE) == 0) {
                    items.add(f2Queue.take());
                }
                f2(items);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    void f2(List<Item> items) throws InterruptedException {
        Random random = new Random();
        for (Item item : items) {
            Thread.sleep(100);
            item.price = random.nextInt(100);
            f3Queue.put(item);
        }
    }
}

/**
 * Thread consuming items produced by f2(). It takes 20 items at a time, but if they are not
 * available it waits and starts processesing as soon as one gets available.
 */
class F3ExecutorThread extends Thread {
    static final int F3_BATCH_SIZE = 20;
    private BlockingQueue<Item> f3Queue;

    public F3ExecutorThread(BlockingQueue<Item> f3Queue) {
        this.f3Queue = f3Queue;
    }

    public void run() {
        try {
            List<Item> items = new ArrayList<>();
            while (true) {
                items.clear();
                if (f3Queue.drainTo(items, F3_BATCH_SIZE) == 0) {
                    items.add(f3Queue.take());
                }
                f3(items);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void f3(List<Item> items) throws InterruptedException {
        Random random = new Random();

        for (Item item : items) {
            Thread.sleep(100);
            item.competitorName = String.valueOf(random.nextInt(100));
            System.out.println("Item done: " + item);
        }
    }
}

class Item {
    String sku, upc, competitorName;
    double price;

    public Item(String sku) {
        this.sku = sku;
    }

    public String toString() {
        return "sku: " + sku + " upc: " + upc + " price: " + price + " compName: " + competitorName;
    }
}

Supongo que también puedes seguir el mismo enfoque en .Net. Para una mejor comprensión, sugiero que revises la arquitectura básica de http://storm.apache.org/releases/current/Tutorial.html


2
2017-09-20 14:55



Intenté hacer lo mismo en .NET y creo que está funcionando.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollectionExample
{
    class Program
    {
        static void Main(string[] args)
        {
            BlockingCollection<Listing> needUPCJobs = new BlockingCollection<Listing>();
            BlockingCollection<Listing> needPricingJobs = new BlockingCollection<Listing>();

            // This will have final output
            List<Listing> output = new List<Listing>();

            // start executor 1 which waits for data until available
            var executor1 = Task.Factory.StartNew(() =>
            {
                int maxSimutenousLimit = 5;
                int gg = 0;
                while (true)
                {
                    while (needUPCJobs.Count >= maxSimutenousLimit)
                    {
                        List<Listing> tempListings = new List<Listing>();
                        for (int i = 0; i < maxSimutenousLimit; i++)
                        {
                            Listing listing = new Listing();
                            if (needUPCJobs.TryTake(out listing))
                                tempListings.Add(listing);
                        }
                        // Simulating some delay for first executor
                        Thread.Sleep(1000);

                        foreach (var eachId in tempListings)
                        {
                            eachId.UPC = gg.ToString();
                            gg++;
                            needPricingJobs.Add(eachId);
                        }
                    }

                    if (needUPCJobs.IsAddingCompleted)
                    {
                        if (needUPCJobs.Count == 0)
                            break;
                        else
                            maxSimutenousLimit = needUPCJobs.Count;
                    }                    
                }
                needPricingJobs.CompleteAdding();
            });

            // start executor 2 which waits for data until available
            var executor2 = Task.Factory.StartNew(() =>
            {
                int maxSimutenousLimit = 10;
                int gg = 10;
                while (true)
                {
                    while (needPricingJobs.Count >= maxSimutenousLimit)
                    {
                        List<Listing> tempListings = new List<Listing>();
                        for (int i = 0; i < maxSimutenousLimit; i++)
                        {
                            Listing listing = new Listing();
                            if (needPricingJobs.TryTake(out listing))
                                tempListings.Add(listing);
                        }
                        // Simulating more delay for second executor
                        Thread.Sleep(10000);

                        foreach (var eachId in tempListings)
                        {
                            eachId.Price = gg;
                            gg++;
                            output.Add(eachId);
                        }
                    }
                    if (needPricingJobs.IsAddingCompleted)
                    {
                        if(needPricingJobs.Count==0)
                            break;
                        else
                            maxSimutenousLimit = needPricingJobs.Count;
                    }
                }

            });

            // producer thread
            var producer = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    needUPCJobs.Add(new Listing() { ID = i });
                }
                needUPCJobs.CompleteAdding();
            });

            // wait for producer to finish producing
            producer.Wait();

            // wait for all executors to finish executing
            Task.WaitAll(executor1, executor2);

            Console.WriteLine();
            Console.WriteLine();
        }
    }

    public class Listing
    {
        public int ID;
        public string UPC;
        public double Price;
        public Listing() { }
    }
}

1
2017-09-22 22:22