Как повторно отправить поток в ExecutorService, если он не выполнен/завершен

Вопрос:

У меня есть Thread, который должен быть повторно отправлен каждый раз, когда его работа завершается или сбой из-за ошибки.

Основной поток нельзя блокировать.

При необходимости нитки должны быть отменены.

Какое лучшее решение?

public class POC {
public static void main(String[] args) {
System.out.println("Init");
new SomeService().waitForEvents();
System.out.println("main not blocked and do other stuff");
}

static class SomeService {
public void waitForEvents() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new AlwaysRunningJob("importantParamForJob"));
// MISSING LOGIC
//    // if AlwaysRunningJob got error or finished,
//    // wait 2 seconds recreate and resubmit it
//    executor.submit(new AlwaysRunningJob("importantParamForJob"));
}

class AlwaysRunningJob implements Runnable {
String importantParamForJob;

public AlwaysRunningJob(String importantParamForJob) {
this.importantParamForJob = importantParamForJob;
}

@Override
public void run() {
Thread.currentThread().setName("AlwaysRunningJob Job");
while (!Thread.currentThread().isInterrupted()) {
// keep waiting for events until
// exception is thrown. or something bad happened
try {
Thread.sleep(5000);
System.out.println("keep working on" + importantParamForJob);
} catch (InterruptedException e) {
// exit if it failed
return;
}
}
System.out.println("Finished run!");
}
}
}

}

Лучший ответ:

Я бы расширил ThreadPoolExecutor, который будет реализовывать protected void afterExecute(Runnable r, Throwable t) по умолчанию этот метод ничего не делает, но вы можете сделать что-то вроде:

public class RetryExecutor extends ThreadPoolExecutor {

    private final long maxRetries;
    private Map<Runnable, Integer> retries = new ConcurrentHashMap<>();

    public RetryExecutor(int corePoolSize, int maximumPoolSize, long maxRetries,
                     long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxRetries = maxRetries;
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null && shouldRetry(r)) {
            retry(r);
        }  else if (t == null && r instanceof Future<?>) {
            try {
                ((Future<?>) r).get();
            } catch (CancellationException | ExecutionException e) {
                // you should log the error
                if (shouldRetry(r)) {
                    retry(r);
                } else {
                    retries.remove(r);
                }
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset or catch it to reschedule
            }
        } else {
            retries.remove(r);
        }
    }

    private boolean shouldRetry(Runnable r) {
        final Integer nbRetries = retries.getOrDefault(r, 0);
        return nbRetries < maxRetries;
    }

    private void retry(Runnable r) {
        final Integer nbRetries = retries.getOrDefault(r, 0);
        retries.put(r, nbRetries + 1);
        this.execute(r);
    }

}

Но с этим Будущим бесполезны, он больше похож на огонь и забывает.

Как @NikitataGorbatchevski это не сработает, когда вы используете Callable. Итак, вот версия, которая может обрабатывать как Runnable, так и Callable. Indead FutureTask не может быть запущен снова, если возникает ошибка (я повторно использовал код для ожидания завершения из FutureTask и не очень уверен в этом):

public class RetryExecutor extends ThreadPoolExecutor {
    private final long maxRetries;
    private Map<Runnable, Integer> retries = new ConcurrentHashMap<>();

    public RetryExecutor(int corePoolSize, int maximumPoolSize, long maxRetries,
                         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxRetries = maxRetries;
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new RetryFutureTask<>(runnable, value);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new RetryFutureTask<>(callable);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null && shouldRetry(r)) {
            retry(r);
        }  else if (t == null && r instanceof Future<?>) {
            try {
                Object result = ((Future<?>) r).get();
            } catch (CancellationException | ExecutionException e) {
                // you should log the error
                if (shouldRetry(r)) {
                    retry(r);
                }  else {
                    retries.remove(r);
                }
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        } else {
            retries.remove(r);
        }
    }

    private boolean shouldRetry(Runnable r) {
        final Integer nbRetries = retries.getOrDefault(r, 0);
        return nbRetries < maxRetries;
    }

    private void retry(Runnable r) {
        final Integer nbRetries = retries.getOrDefault(r, 0);
        retries.put(r, nbRetries + 1);
        this.execute(r);
    }

    private static class RetryFutureTask<V> implements RunnableFuture<V> {
        private static final int NEW = 0;
        private static final int RUNNING = 1;
        private static final int ERROR = 2;
        private static final int FINISHED = 3;
        private static final int INTERRUPTED = 4;
        private final AtomicInteger state = new AtomicInteger(NEW);
        private final AtomicReference<Thread> runner = new AtomicReference<>();
        private final AtomicReference<WaitNode> waiters = new AtomicReference<>();
        private final Callable<V> callable;
        private Exception error;
        private V result;

        public RetryFutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
        }

        public RetryFutureTask(Callable<V> callable) {
            this.callable = callable;
        }

        @Override
        public void run() {
            try {
                // If not already running
                if (runner.compareAndSet(null, Thread.currentThread())) {
                    state.set(RUNNING);
                    result = this.callable.call();
                    state.compareAndSet(RUNNING, FINISHED);
                }
            } catch (Exception e) {
                error = e;
                state.compareAndSet(RUNNING, ERROR);
                finishCompletion();
            } finally {
                runner.set(null);
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (state.get() == RUNNING || state.get() == INTERRUPTED) {
                return false;
            }
            try {
                Thread t = runner.get();
                if (mayInterruptIfRunning && t != null) {
                    t.interrupt();
                }
            } finally {
                state.set(INTERRUPTED);
                finishCompletion();
            }
            return true;
        }

        @Override
        public boolean isCancelled() {
            return state.get() == INTERRUPTED;
        }

        @Override
        public boolean isDone() {
            return state.get() > RUNNING;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            if (state.get() <= RUNNING) {
                awaitDone(false, 0L);
            }
            return resolve();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (state.get() <= RUNNING) {
                awaitDone(true, unit.toNanos(timeout));
            }
            return resolve();
        }

        private V resolve() throws ExecutionException, InterruptedException {
            if (state.get() == ERROR) {
                throw new ExecutionException(error);
            } else if (state.get() == INTERRUPTED) {
                throw new InterruptedException();
            }
            return result;
        }

        private void finishCompletion() {
            for (WaitNode q; (q = waiters.get()) != null;) {
                if (waiters.compareAndSet(q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                break;
                }
            }
        }

        private void awaitDone(boolean timed, long nanos) throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (; ; ) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }

                int s = state.get();
                if (s > RUNNING) {
                    if (q != null)
                        q.thread = null;
                    return;
                } else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = waiters.compareAndSet(q.next, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return;
                    }
                    LockSupport.parkNanos(this, nanos);
                } else
                    LockSupport.park(this);
            }
        }

        private void removeWaiter(WaitNode node) {
            if (node != null) {
                node.thread = null;
                retry:
                for (;;) {          // restart on removeWaiter race
                    for (WaitNode pred = null, q = waiters.get(), s; q != null; q = s) {
                        s = q.next;
                        if (q.thread != null)
                            pred = q;
                        else if (pred != null) {
                            pred.next = s;
                            if (pred.thread == null) // check for race
                                continue retry;
                        }
                        else if (!waiters.compareAndSet(q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }

        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    }
}

Ответ №1

Я бы предложил что-то вроде этого:

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new RepeatableWorker());

System.out.println("Main does other work");
Thread.sleep(3300);
System.out.println("Main work was finished, time to exit");

// shutdownNow interrupts running threads
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}

public static class RepeatableWorker extends Worker {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
boolean error = false;
Exception ex = null;
try {
// In some cases it make sense to run this method in a separate thread.
// For example if you want to give some time to the last worker thread to complete
// before interrupting it from repeatable worker
super.run();
} catch (Exception e) {
error = true;
ex = e;
}

if (Thread.currentThread().isInterrupted()) {
System.out.println("worker was interrupted");
// just exit as last task was interrupted
continue;
}

if (!error) {
System.out.println("worker task was finished normally");
} else {
System.out.println("worker task was finished due to error " + ex.getMessage());
}
// wait some time before the next start
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("Repeatable worker was interrupted");
// ok we were interrupted
// restore interrupted status and exit
Thread.currentThread().interrupt();
}
}
System.out.println("repeatable task was finished");
}
}

public static class Worker implements Runnable {
@Override
public void run() {
try {
// emulate some work
Thread.sleep(500L);
if (new Random().nextBoolean()) {
throw new RuntimeException("ouch");
}
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
}

Ответ №2

Если вы хотите придерживаться ExecutorService, я бы переключился с Runnable на Callable. При отправке Callable вы получаете Future вы можете получить результат (или исключение) кода задания в Callable.

Поэтому вы можете определить, успешно ли выполнено задание или нет, и повторно отправить задание, если вам нужно:

static class SomeService {
public void waitForEvents() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Void> future = executor.submit(new AlwaysRunningJob("importantParamForJob"));

try {
future.get(); // this call waits until the Callable has finished (or failed)
} catch (InterruptedException | ExecutionException e) {
// Error -> resubmit
e.printStackTrace();
}
// No error -> do something else
}

class AlwaysRunningJob implements Callable<Void> {
String importantParamForJob;

public AlwaysRunningJob(String importantParamForJob) {
this.importantParamForJob = importantParamForJob;
}

@Override
public Void call() throws Exception {
Thread.currentThread().setName("AlwaysRunningJob Job");
while (!Thread.currentThread().isInterrupted()) {
// keep waiting for events until
// exception is thrown. or something bad happened
try {
Thread.sleep(5000);
System.out.println("keep working on" + importantParamForJob);
} catch (InterruptedException e) {
// exit if it failed
return null;
}
}
System.out.println("Finished run!");
return null;
}
}
}

Оцените статью
TechArks.Ru
Добавить комментарий