Pregunta productor - consumo; ¿Cómo se detiene el consumidor?


Así que simulé mi problema de consumidor productor y tengo el siguiente código. Mi pregunta es la siguiente: ¿cómo se detiene el consumidor si está en constante momento (verdadero)?

En el siguiente código, he añadido

                    if (queue.peek()==null)
                         Thread.currentThread().interrupt();

que funciona muy bien en este ejemplo. Pero en mi diseño del mundo real, esto no funciona (a veces le toma más tiempo al productor 'poner' los datos para que la excepción lanzada en el consumidor sea incorrecta. En general, sé que puedo poner datos de 'veneno' como Object es XYZ y puedo consultarlo en el consumidor. Pero este veneno hace que el código realmente se vea mal. Me pregunto si alguien tiene un enfoque diferente.

public class ConsumerThread implements Runnable
{
 private BlockingQueue<Integer> queue;
 private String name;
 private boolean isFirstTimeConsuming = true;
 public ConsumerThread(String name, BlockingQueue<Integer> queue)
 {
    this.queue=queue;
    this.name=name;
 }

@Override
public void run()
{
    try
    {       
        while (true)
        {   
            if (isFirstTimeConsuming)
            {
                System.out.println(name+" is initilizing...");
                Thread.sleep(4000);
                isFirstTimeConsuming=false;
            }
            try{

                if (queue.peek()==null)
                    Thread.currentThread().interrupt();

                Integer data = queue.take();

                System.out.println(name+" consumed ------->"+data);
                Thread.sleep(70);    

            }catch(InterruptedException ie)
            {
                System.out.println("InterruptedException!!!!");
                break;
            }
        }

        System.out.println("Comsumer " + this.name + " finished its job; terminating.");

    }catch (InterruptedException e)
    {
        e.printStackTrace();
    } 
}

}


5
2018-04-27 14:47


origen


Respuestas:


UN: Simplemente no hay garantía de que solo porque peek devoluciones null, el productor ha dejado de producir. ¿Qué pasa si el productor simplemente se ralentizó? Ahora, el consumidor abandona, y el productor sigue produciendo. Así que la idea de 'peek' -> 'break' básicamente falla.

SEGUNDO: La configuración de un indicador 'hecho / ejecutado' del consumidor y su lectura en el productor también falla, si:

  1. el consumidor revisa la bandera, encuentra que debería seguir funcionando, luego hace una 'toma'
  2. mientras tanto, el productor estaba poniendo la bandera para 'no correr'
  3. Ahora los bloques de consumidores siempre esperando un paquete fantasma

También puede ocurrir lo contrario, y un paquete se deja sin consumir.

Luego, para evitar esto, querrá hacer una sincronización adicional con mutexes por encima del 'BlockingQueue'.

DO: Encuentro que el 'Código Rosetta' es una buena fuente para decidir qué es una buena práctica, en situaciones como esta:

http://rosettacode.org/wiki/Synchronous_concurrency#Java

El productor y el consumidor deben acordar un objeto (o un atributo en el objeto) que representa el final de la entrada. Luego, el productor establece ese atributo en el último paquete y el consumidor deja de consumirlo. es decir, a lo que se refiere en su pregunta como "veneno".

En el ejemplo anterior del Código Rosetta, este 'objeto' es simplemente un vacío String llamado 'EOF':

final String EOF = new String();

// Producer
while ((line = br.readLine()) != null)
  queue.put(line);
br.close();
// signal end of input
queue.put(EOF);

// Consumer
while (true)
  {
    try
      {
        String line = queue.take();
        // Reference equality
        if (line == EOF)
          break;
        System.out.println(line);
        linesWrote++;
      }
    catch (InterruptedException ie)
      {
      }
  }

7
2018-04-27 15:21



NO utilice la interrupción en el hilo, sino que rompe el ciclo cuando ya no es necesario:

if (queue.peek()==null)
         break;

O también puede usar una variable para marcar la operación de cierre pendiente y luego interrumpir el ciclo y cerrar el ciclo después de:

if (queue.peek()==null)
         closing = true;

//Do further operations ...
if(closing)
  break;

3
2018-04-27 14:51



En el mundo real, la mayoría de los mensajes vienen con un encabezado de algún tipo que define un tipo / subtipo de mensaje o tal vez diferentes objetos.

Puede crear un comando y controlar un objeto o tipo de mensaje que le indique al hilo que haga algo cuando reciba el mensaje (como apagar, volver a cargar una tabla, agregar un nuevo oyente, etc.).

De esta forma, puede decir que un hilo de comando y control simplemente envía mensajes al flujo de mensajes normal. Puede hacer que el hilo CNC se comunique con un terminal operativo en un sistema a gran escala, etc.


0
2018-04-27 15:04



Si su cola puede vaciarse antes de que desee que su consumidor termine, necesitará una bandera para indicarle a la cadena cuándo parar. Agregue un método setter para que el productor pueda decirle al consumidor que se apague. Luego modifique su código para que en lugar de:

if (queue.isEmpty())
   break;

verifica tu código

if (!run)
{
   break;
}
else if (queue.isEmpty())
{
   Thread.sleep(200);
   continue;
}

0
2018-04-27 15:08