Pregunta ExecutorService que interrumpe las tareas después de un tiempo de espera


Estoy buscando un ExecutorService implementación que se puede proporcionar con un tiempo de espera excedido. Las tareas que se envían al ExecutorService se interrumpen si tardan más tiempo que el tiempo de espera en ejecutarse. Implementar tal bestia no es una tarea tan difícil, pero me pregunto si alguien sabe de una implementación existente.

Esto es lo que surgió en función de algunos de los debates a continuación. ¿Algún comentario?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}

76
2018-05-03 14:10


origen


Respuestas:


Puedes usar un ScheduledExecutorService para esto. Primero lo enviaría solo una vez para comenzar inmediatamente y retener el futuro que se crea. Después de eso, puede enviar una nueva tarea que cancelaría el futuro retenido después de un período de tiempo.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Esto ejecutará su controlador (la funcionalidad principal será interrumpida) durante 10 segundos, luego cancelará (es decir, interrumpirá) esa tarea específica.


71
2018-05-03 15:12



Lamentablemente, la solución es defectuosa. Hay una especie de error con ScheduledThreadPoolExecutor, también reportado en esta pregunta: la cancelación de una tarea enviada no libera por completo los recursos de memoria asociados con la tarea; los recursos se lanzan solo cuando la tarea expira.

Si por lo tanto, crea un TimeoutThreadPoolExecutor con un tiempo de caducidad bastante largo (un uso típico) y enviar tareas lo suficientemente rápido, terminará llenando la memoria, incluso aunque las tareas se hayan completado correctamente.

Puede ver el problema con el siguiente programa de prueba (muy crudo):

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

El programa agota la memoria disponible, aunque espera la generación Runnables para completar.

Pensé en esto por un tiempo, pero lamentablemente no pude encontrar una buena solución.

EDITAR: Descubrí que este problema fue informado como Error JDK 6602600, y parece haber sido arreglado recientemente.


5
2017-10-11 16:14



Envuelva la tarea en FutureTask y puede especificar el tiempo de espera para FutureTask. Mira el ejemplo en mi respuesta a esta pregunta,

java native Process timeout


4
2018-05-03 14:46



¿Qué hay de usar el ExecutorService.shutDownNow() método como se describe en http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html? Parece ser la solución más simple.


1
2018-03-04 19:27



Parece que el problema no está en el error JDK 6602600 (se resolvió en 2010-05-22), pero en llamada incorrecta de sueño (10) en círculo. Además, note que el hilo principal debe dar directamente CHANCE a otros hilos para realizar sus tareas por invocación SLEEP (0) en CADA rama del círculo exterior. Creo que es mejor utilizar Thread.yield () en lugar de Thread.sleep (0)

El resultado corregido parte del código de problema anterior es tal como este:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Funciona correctamente con la cantidad de contador externo de hasta 150 000 000 círculos probados.


1
2017-12-27 08:24



Después de una tonelada de tiempo para inspeccionar,
Finalmente, yo uso invokeAll método de ExecutorService para resolver este problema.
Eso interrumpirá estrictamente la tarea mientras se ejecuta la tarea.
Aquí hay un ejemplo

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

El profesional es que también puedes enviar ListenableFuture al mismo ExecutorService.
Simplemente cambie ligeramente la primera línea de código.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService es la función de Escucha de ExecutorService en el proyecto de google guayaba (com.google.guava)


1
2017-08-02 01:56



Usando la respuesta de John W, creé una implementación que iniciaba correctamente el tiempo de espera cuando la tarea inicia su ejecución. Incluso escribo una prueba unitaria para ello :)

Sin embargo, no se ajusta a mis necesidades ya que algunas operaciones IO no interrumpen cuando Future.cancel() se llama (es decir, cuando Thread.interrupted() se llama).

De todos modos, si alguien está interesado, creé una esencia: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf


1
2017-07-16 21:04



¿Qué pasa con esta idea alternativa?

  • dos tienen dos ejecutores:
    • uno para :
      • enviar la tarea, sin preocuparse por el tiempo de espera de la tarea
      • agregando el futuro resultado y el momento en que debería terminar en una estructura interna
    • uno para ejecutar un trabajo interno que verifica la estructura interna si algunas tareas tienen un tiempo de espera y si deben cancelarse.

Pequeña muestra está aquí:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}

0
2017-08-07 13:20