Pregunta Cómo usar IObservable / IObserver con ConcurrentQueue o ConcurrentStack


Me di cuenta de que cuando estoy tratando de procesar elementos en una cola simultánea usando múltiples hilos mientras que múltiples hilos pueden poner elementos en él, la solución ideal sería usar las Extensiones reactivas con las estructuras de datos concurrentes.

Mi pregunta original está en:

Mientras usa ConcurrentQueue, intenta dequeue mientras realiza un bucle en paralelo

Por lo tanto, tengo curiosidad de saber si hay alguna forma de tener una consulta LINQ (o PLINQ) que devegue continuamente a medida que se colocan los elementos en ella.

Estoy tratando de hacer que esto funcione de una manera en la que pueda tener n cantidad de productores ingresando a la cola y un número limitado de procesos para procesar, por lo que no sobrecargo la base de datos.

Si pudiera usar Rx framework, entonces podría iniciarlo y si se colocan 100 elementos dentro de 100ms, los 20 hilos que forman parte de la consulta PLINQ simplemente procesarían a través de la cola.

Hay tres tecnologías que trato de trabajar juntas:

  1. Rx Framework (LINQ reactivo)
  2. VENDIENDO
  3. System.Collections.Concurrent estructuras

5
2018-06-13 00:47


origen


Respuestas:


Drew tiene razón, creo que el ConcurrentQueue aunque suene perfecto para el trabajo es en realidad la estructura de datos subyacente que utiliza BlockingCollection. Me parece muy de atrás para adelante también. Echa un vistazo al capítulo 7 de este libro * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 y explicará cómo usar BlockingCollection y tendrá múltiples productores y múltiples consumidores sacando cada uno la "cola". Deberá consultar el método "GetConsumingEnumerable ()" y posiblemente simplemente invocar .ToObservable () sobre eso.

* el resto del libro es bastante promedio.

editar:

Aquí hay un programa de ejemplo que creo que hace lo que quiere

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}

6
2018-01-06 13:18



No sé cuál es la mejor manera de lograr esto con Rx, pero recomendaría simplemente usar BlockingCollection<T> y el patrón productor-consumidor. Tu hilo principal agrega elementos a la colección, que usa ConcurrentQueue<T> debajo por defecto. Entonces tienes un separado Task que giras por delante de lo que usa Parallel::ForEach sobre el BlockingCollection<T> para procesar tantos elementos de la colección como tenga sentido para el sistema al mismo tiempo. Ahora, es probable que también desee examinar el uso de GetConsumingPartitioner método de la biblioteca ParallelExtensions para ser más eficiente ya que el particionador predeterminado creará más sobrecarga de lo que usted desea en este caso. Puedes leer más sobre esto desde esta publicación en el blog.

Cuando el hilo principal está terminado llamas CompleteAdding sobre el BlockingCollection<T> y Task::Wait sobre el Task Giraron para esperar a que todos los consumidores terminaran de procesar todos los artículos de la colección.


3
2017-12-01 05:26