Pregunta ¿Cómo hacer que ThreadPoolExecutor aumente los hilos al máximo antes de hacer cola?


Me he sentido frustrado por algún tiempo con el comportamiento predeterminado de ThreadPoolExecutor que respalda el ExecutorService grupos de hilos que muchos de nosotros usamos. Para citar de los Javadocs:

Si hay más de corePoolSize pero menos de los hilos de MaximumPoolSize en ejecución, se creará un nuevo hilo solo si la cola está llena.

Lo que esto significa es que si defines un grupo de subprocesos con el siguiente código, Nunca iniciar el segundo hilo porque el LinkedBlockingQueue no tiene límites

ExecutorService threadPool =
    new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Solo si tienes un encerrado cola y la cola es completo ¿Hay algún hilo por encima del número central iniciado? Sospecho que una gran cantidad de programadores multiproceso de Java junior no conocen este comportamiento del ThreadPoolExecutor.

Ahora tengo un caso de uso específico donde esto no es óptimo. Estoy buscando formas, sin escribir mi propia clase de TPE, de evitarlo.

Mis requisitos son para un servicio web que hace call-backs a un tercero posiblemente no confiable.

  • No deseo realizar la devolución de llamada de forma sincrónica con la solicitud web, por lo que quiero utilizar un grupo de subprocesos.
  • Normalmente obtengo un par de estos por minuto, así que no quiero tener un newFixedThreadPool(...) con una gran cantidad de hilos que en su mayoría están inactivos.
  • De vez en cuando recibo un estallido de este tráfico y quiero ampliar el número de subprocesos a algún valor máximo (digamos 50).
  • Necesito hacer un mejor intento hacer todas las devoluciones de llamada, así que quiero poner en cola cualquier adicional por encima de 50. No quiero abrumar al resto de mi servidor web usando un newCachedThreadPool().

¿Cómo puedo evitar esta limitación en ThreadPoolExecutor donde la cola debe estar limitada y llena antes de se iniciarán más hilos? ¿Cómo puedo hacer para que comience más hilos? antes de tareas de espera?

Editar:

@Flavio hace un buen punto sobre el uso de ThreadPoolExecutor.allowCoreThreadTimeOut(true) tener los hilos del núcleo de tiempo de espera y salir. Lo consideré pero aún quería la función de hilos centrales. No quería que la cantidad de subprocesos en el grupo cayera por debajo del tamaño del núcleo si era posible.


76
2017-10-22 21:02


origen


Respuestas:


¿Cómo puedo evitar esta limitación en ThreadPoolExecutor donde la cola debe estar delimitada y completa antes de que se inicien más hilos.

Creo que finalmente encontré una solución algo elegante (tal vez un poco hacky) a esta limitación con ThreadPoolExecutor. Implica extender LinkedBlockingQueue tenerlo regresar false para queue.offer(...) cuando ya hay algunas tareas en cola. Si los subprocesos actuales no se mantienen al día con las tareas en cola, el TPE agregará subprocesos adicionales. Si el grupo ya está en el máximo de hilos, entonces el RejectedExecutionHandler sera llamado. Es el controlador que luego hace el put(...) en la cola.

Ciertamente es extraño escribir una cola donde offer(...) puede regresar false y put() nunca bloquea, así que esa es la parte de hackeo. Pero esto funciona bien con el uso de la cola por parte de TPE, así que no veo ningún problema para hacer esto.

Aquí está el código:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        /*
         * Offer it to the queue if there is 0 items already queued, else
         * return false so the TPE will add another thread. If we return false
         * and max threads have been reached then the RejectedExecutionHandler
         * will be called which will do the put into the queue.
         */
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            /*
             * This does the actual put into the queue. Once the max threads
             * have been reached, the tasks will then queue up.
             */
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Con este mecanismo, cuando envío tareas a la cola, ThreadPoolExecutor será:

  1. Escala el número de subprocesos hasta el tamaño del núcleo inicialmente (aquí 1).
  2. Ofrecerlo a la cola. Si la cola está vacía, se pondrá en cola para ser manejada por los hilos existentes.
  3. Si la cola ya tiene 1 o más elementos, el offer(...) devolverá falso.
  4. Si se devuelve falso, amplíe el número de subprocesos en el grupo hasta que alcancen el número máximo (aquí 50).
  5. Si está al máximo, llama al RejectedExecutionHandler
  6. los RejectedExecutionHandler luego coloca la tarea en la cola para ser procesada por la primera cadena disponible en orden FIFO.

Aunque en mi código de ejemplo anterior, la cola no tiene límites, también podría definirla como una cola acotada. Por ejemplo, si agrega una capacidad de 1000 al LinkedBlockingQueue entonces lo hará:

  1. escalar los hilos hasta max
  2. luego haz cola hasta que esté lleno con 1000 tareas
  3. luego bloquea a la persona que llama hasta que haya espacio disponible para la cola.

Además, si realmente necesitaras usar offer(...) en el RejectedExecutionHandler entonces podrías usar el offer(E, long, TimeUnit) método en su lugar con Long.MAX_VALUE como el tiempo de espera.

Editar:

He ajustado mi offer(...) método anulado por retroalimentación de @ Ralf. Esto solo aumentará el número de subprocesos en el grupo si no se mantienen al día con la carga.

Editar:

Otro ajuste a esta respuesta podría ser preguntar al TPE si hay subprocesos inactivos y solo poner en cola el elemento si es así. Tendría que hacer una verdadera clase para esto y agregar un ourQueue.setThreadPoolExecutor(tpe); método en él.

Entonces tu offer(...) método podría ser algo así como:

  1. Verifique si el tpe.getPoolSize() == tpe.getMaximumPoolSize() en cuyo caso simplemente llame super.offer(...).
  2. Si no tpe.getPoolSize() > tpe.getActiveCount() luego llame super.offer(...) ya que parece que hay hilos inactivos.
  3. De lo false para bifurcar otro hilo.

Tal vez esto:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Tenga en cuenta que los métodos get en TPE son caros ya que tienen acceso volatile campos o (en el caso de getActiveCount()) bloquear el TPE y recorrer la lista de hilos. Además, existen condiciones de carrera que pueden ocasionar que una tarea se ponga en cola incorrectamente o se bifurca otro hilo cuando hay un hilo inactivo.


35
2017-10-22 21:22



Establezca el tamaño del núcleo y el tamaño máximo en el mismo valor, y permita que los hilos del núcleo se eliminen del grupo con allowCoreThreadTimeOut(true).


21
2018-06-30 15:37



Ya tengo otras dos respuestas sobre esta pregunta, pero sospecho que esta es la mejor.

Está basado en la técnica de la respuesta actualmente aceptada, a saber:

  1. Anular la cola offer() método para (a veces) devolver falso
  2. que causa el ThreadPoolExecutor generar un nuevo hilo o rechazar la tarea, y
  3. selecciona el RejectedExecutionHandler a actualmente poner la tarea en el rechazo.

El problema es cuando offer() debería devolver falso. La respuesta actualmente aceptada devuelve falso cuando la cola tiene un par de tareas, pero como he señalado en mi comentario allí, esto causa efectos no deseados. Alternativamente, si siempre devuelve falso, seguirá generando nuevos subprocesos incluso cuando haya subprocesos esperando en la cola.

La solución es usar Java 7 LinkedTransferQueue y tiene offer() llamada tryTransfer(). Cuando hay un hilo de consumidor en espera, la tarea se pasará a ese hilo. De otra manera, offer() devolverá falso y el ThreadPoolExecutor generará un nuevo hilo.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

14
2017-11-22 19:43



Nota: ahora prefiero y recomiendo mi otra respuesta.

Aquí hay una versión que me parece mucho más sencilla: aumentar el corePoolSize (hasta el límite de maximumPoolSize) siempre que se ejecute una nueva tarea, luego disminuir el corePoolSize (hasta el límite del "tamaño del grupo de núcleos" especificado por el usuario) siempre que tarea completa.

Para decirlo de otra manera, realice un seguimiento del número de tareas en ejecución o en cola, y asegúrese de que corePoolSize sea igual al número de tareas, siempre que se encuentre entre el "tamaño del grupo principal" especificado y el tamaño máximo de la cuenta.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Como está escrito, la clase no admite el cambio de corePoolSize o maximumPoolSize especificado por el usuario después de la construcción, y no admite la manipulación de la cola de trabajo directamente o a través de remove() o purge().


7
2017-10-23 10:15



Tenemos una subclase de ThreadPoolExecutor eso toma un adicional creationThreshold y anula execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

tal vez eso ayude también, pero el suyo se ve más artístico, por supuesto ...


5
2018-06-20 14:02



La respuesta recomendada resuelve solo uno (1) del problema con el grupo de subprocesos JDK:

  1. Los grupos de subprocesos de JDK están sesgados hacia la cola. Entonces, en lugar de generar un nuevo hilo, pondrán la tarea en cola. Solo si la cola alcanza su límite, el grupo de subprocesos generará un nuevo subproceso.

  2. La retirada del hilo no ocurre cuando la carga se aclara. Por ejemplo, si tenemos un estallido de trabajos que golpea el grupo que hace que el grupo llegue al máximo, seguido de una carga ligera de un máximo de 2 tareas a la vez, el grupo utilizará todos los hilos para mantener la carga ligera evitando la retirada del hilo. (solo se necesitarían 2 hilos ...)

Descontento con el comportamiento anterior, seguí adelante e implementé un grupo para superar las deficiencias anteriores.

Para resolver 2) El uso de la programación de Lifo resuelve el problema. Esta idea fue presentada por Ben Maurer en la conferencia APM Applicative 2015: Escala Systems @ Facebook 

Entonces nació una nueva implementación:

LifoThreadPoolExecutorSQP

Hasta ahora, esta implementación mejora el rendimiento de ejecución asíncrono para ZEL.

La implementación tiene la capacidad de girar para reducir la sobrecarga del cambio de contexto, ofreciendo un rendimiento superior para ciertos casos de uso.

Espero eso ayude...

PD: JDK Fork Join Pool implementa ExecutorService y funciona como un grupo de subprocesos "normal", la implementación es efectiva, utiliza la programación de subprocesos LIFO, sin embargo no hay control sobre el tamaño de cola interna, el tiempo de espera de jubilación ...


3
2017-11-27 19:15



Nota: ahora prefiero y recomiendo mi otra respuesta.

Tengo otra propuesta, siguiendo con la idea original de cambiar la cola para devolver falsa. En esta, todas las tareas pueden ingresar a la cola, pero cada vez que una tarea se pone en cola después de execute(), lo seguimos con una tarea centinela no operativa que la cola rechaza, provocando que se genere un nuevo subproceso, que ejecutará la operación no operativa inmediatamente seguida de algo de la cola.

Porque los hilos de trabajo pueden estar sondeando el LinkedBlockingQueue para una tarea nueva, es posible que una tarea se ponga en cola incluso cuando hay un hilo disponible. Para evitar generar nuevos subprocesos incluso cuando hay subprocesos disponibles, necesitamos realizar un seguimiento de cuántos subprocesos están esperando nuevas tareas en la cola y solo generar un nuevo subproceso cuando hay más tareas en cola que esperar subprocesos.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});

1
2017-10-22 21:52