diff options
Diffstat (limited to 'src/java/com')
-rw-r--r-- | src/java/com/jogamp/common/util/WorkerThread.java | 314 |
1 files changed, 217 insertions, 97 deletions
diff --git a/src/java/com/jogamp/common/util/WorkerThread.java b/src/java/com/jogamp/common/util/WorkerThread.java index 52182b6..42466da 100644 --- a/src/java/com/jogamp/common/util/WorkerThread.java +++ b/src/java/com/jogamp/common/util/WorkerThread.java @@ -33,33 +33,77 @@ import java.time.temporal.ChronoUnit; import java.util.concurrent.atomic.AtomicInteger; /** - * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link Thread} - * with an optional minimum execution duration, see {@link #getSleptDuration()}. + * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link #getThread() thread} + * with an optional minimum execution duration, see {@link #getSleptDuration()} + * executing a {@link Callback#run(WorkerThread) task} periodically. + * <p> + * Optionally a {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task} + * can be given for fine grained control. + * </p> + * <p> + * If an exception occurs during execution of the work {@link Callback}, the worker {@link #getThread() thread} is {@link #pause(boolean)}'ed + * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state. + * User may {@link #resume()} or {@link #stop()} the thread. + * </p> + * <p> + * If an exception occurs during execution of the optional + * {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task}, + * the worker {@link #getThread() thread} is {@link #stop()}'ed + * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state. + * </p> */ public class WorkerThread { /** - * An interruptible {@link #run()} task. + * An interruptible {@link #run() task} periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. */ public interface Callback { - void run() throws InterruptedException; + /** + * Task to be periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. + * @param self The {@link WorkerThread} manager + * @throws InterruptedException + */ + void run(WorkerThread self) throws InterruptedException; } + /** + * An interruptible {@link State} {@link #run() task} on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. + */ + public interface StateCallback { + /** State change cause. */ + public static enum State { + INIT, PAUSED, RESUMED, END + } + /** + * Task to be executed on {@link State} change on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. + * @param self The {@link WorkerThread} manager + * @param cause the {@link State} change cause + * @throws InterruptedException + */ + void run(WorkerThread self, State cause) throws InterruptedException; + } + + private static final int RUNNING = 1 << 0; + private static final int ACTIVE = 1 << 1; + private static final int BLOCKED = 1 << 2; + private static final int SHALL_PAUSE = 1 << 3; + private static final int SHALL_STOP = 1 << 4; + private static final int USE_MINIMUM = 1 << 5; + private static final int DAEMON = 1 << 6; private static AtomicInteger instanceId = new AtomicInteger(0); - private volatile boolean isRunning = false; - private volatile boolean isActive = false; - private volatile boolean isBlocked = false; - private volatile boolean shallPause = true; - private volatile boolean shallStop = false; + private volatile int state; + private final static boolean isSet(final int state, final int mask) { return mask == ( state & mask ); } + private final boolean isSet(final int mask) { return mask == ( state & mask ); } + private final void set(final int mask) { state |= mask; } + private final void clear(final int mask) { state &= ~mask; } + private final Duration minPeriod; private final Duration minDelay; - private final boolean useMinimum; private final Callback cbWork; - private final Runnable cbInitLocked; - private final Runnable cbEndLocked; - private final boolean isDaemonThread; + private final StateCallback cbState; private Thread thread; private volatile Duration sleptDuration = Duration.ZERO; + private volatile Exception workErr = null; /** * Instantiates a new {@link WorkerThread}. @@ -69,7 +113,7 @@ public class WorkerThread { * @param work the actual work {@link Callback} to perform. */ public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work) { - this(minPeriod, minDelay, daemonThread, work, null, null); + this(minPeriod, minDelay, daemonThread, work, null); } /** @@ -78,98 +122,104 @@ public class WorkerThread { * @param minDelay minimum work-loop-delay to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()} * @param daemonThread argument for {@link Thread#setDaemon(boolean)} * @param work the actual work {@link Callback} to perform. - * @param init optional initialization {@link Runnable} called at {@link #start()} while locked - * @param end optional release {@link Runnable} called at {@link #stop()} while locked + * @param stateChangeCB optional {@link StateCallback} called at different {@link StateCallback.State} changes while locked */ - public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final Runnable init, final Runnable end) { + public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB) { + this.state = 0; this.minPeriod = null != minPeriod ? minPeriod : Duration.ZERO; this.minDelay = null != minDelay ? minDelay : Duration.ZERO; - this.useMinimum = this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0; + if( this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0 ) { + set(USE_MINIMUM); + } this.cbWork = work; - this.cbInitLocked = init; - this.cbEndLocked = end; - this.isDaemonThread = daemonThread; + this.cbState = stateChangeCB; + if( daemonThread ) { + set(DAEMON); + } thread = null; } /** * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed. * <p> - * Method blocks until the new worker thread has started, {@link #isRunning()} and also {@link #isActive()} - * </p> - */ - public final synchronized void start() { - start(false); - } - - /** - * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed. - * <p> * Method blocks until the new worker thread has been started and {@link #isRunning()} and {@link #isActive()} if {@code paused == false}. * </p> * @param paused if {@code true}, keeps the new worker thread paused, otherwise {@link #resume()} it. */ public final synchronized void start(final boolean paused) { - if( isRunning ) { + if( isSet(RUNNING) || null != thread || isSet(SHALL_STOP) || isSet(SHALL_PAUSE) ) { + // definite start condition: !isRunning + // subsequent conditions only for consistency/doc: null == thread && !shallStop && !shallPause return; } - shallStop = false; - shallPause = true; + if( paused ) { + set(SHALL_PAUSE); + } thread = new Thread(threadRunnable); - thread.setDaemon(isDaemonThread); + thread.setDaemon(isSet(DAEMON)); thread.start(); try { this.notifyAll(); // wake-up startup-block - while( !isRunning && !shallStop ) { - this.wait(); // wait until started + if( !paused ) { + while( !isSet(RUNNING) && !isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) { + this.wait(); // wait until started and active (not-paused) + } + } else { + while( !isSet(RUNNING) && null != thread && !isSet(SHALL_STOP) ) { + this.wait(); // wait until started + } + while( isSet(RUNNING) && isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) { + this.wait(); // wait until paused + } } } catch (final InterruptedException e) { throw new InterruptedRuntimeException(e); } - if( !paused ) { - resume(); - } } /** * Stops execution of the {@link #start()}'ed worker thread. * <p> - * Method blocks until worker thread has stopped. + * Method blocks until worker thread has been {@link #isRunning() stopped} if {@code waitUntilDone} is {@code true}. * </p> */ - public final synchronized void stop() { - if( isRunning ) { - shallStop = true; + public final synchronized void stop(final boolean waitUntilDone) { + if( isSet(RUNNING) ) { + set(SHALL_STOP); + this.notifyAll(); // wake-up pause-block (opt) if( java.lang.Thread.currentThread() != thread ) { - if( isBlocked && isRunning ) { + if( isSet(BLOCKED | RUNNING) ) { thread.interrupt(); } - try { - this.notifyAll(); // wake-up pause-block (opt) - while( isRunning ) { - this.wait(); // wait until stopped + if( waitUntilDone ) { + try { + while( isSet(RUNNING) ) { + this.wait(); // wait until stopped + } + } catch (final InterruptedException e) { + throw new InterruptedRuntimeException(e); } - } catch (final InterruptedException e) { - throw new InterruptedRuntimeException(e); } } - thread = null; - shallStop = false; - shallPause = true; } } - /** Pauses execution of the {@link #start()}'ed worker thread. */ + /** + * Pauses execution of the {@link #start()}'ed worker thread. + * <p> + * Method blocks until worker thread has been {@link #isActive()}'ated if {@code waitUntilDone} is {@code true}. + * </p> + */ public final synchronized void pause(final boolean waitUntilDone) { - if( isActive ) { - shallPause = true; + if( isSet(RUNNING | ACTIVE) && !isSet(SHALL_STOP) ) { + set(SHALL_PAUSE); if( java.lang.Thread.currentThread() != thread ) { - if( isBlocked && isActive ) { + if( isSet(BLOCKED | ACTIVE) ) { thread.interrupt(); } if( waitUntilDone ) { try { - while( isActive && isRunning ) { + while( isSet(RUNNING | ACTIVE) ) { this.wait(); // wait until paused } } catch (final InterruptedException e) { @@ -182,12 +232,12 @@ public class WorkerThread { /** Resumes execution of the {@link #pause(boolean)}'ed worker thread. */ public final synchronized void resume() { - if( isRunning && !isActive ) { - shallPause = false; + if( isSet(RUNNING) && !isSet(ACTIVE) && !isSet(SHALL_STOP) ) { + clear(SHALL_PAUSE); + this.notifyAll(); // wake-up pause-block if( java.lang.Thread.currentThread() != thread ) { try { - this.notifyAll(); // wake-up pause-block - while( !isActive && !shallPause && isRunning ) { + while( !isSet(ACTIVE) && !isSet(SHALL_PAUSE) && isSet(RUNNING) ) { this.wait(); // wait until resumed } } catch (final InterruptedException e) { @@ -199,9 +249,21 @@ public class WorkerThread { } /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()}. It might be {@link #pause(boolean) paused}. */ - public final boolean isRunning() { return isRunning; } - /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()} and is not {@link #pause(boolean) paused}. */ - public final boolean isActive() { return isActive; } + public final boolean isRunning() { return isSet(RUNNING); } + /** Returns true if the worker thread {@link #isRunning()} and is not {@link #pause(boolean) paused}. */ + public final boolean isActive() { return isSet(ACTIVE); } + /** Returns true if the worker thread {@link #isRunning()} and is {@link #pause(boolean) paused}. */ + public final boolean isPaused() { return isSet(RUNNING) && !isSet(ACTIVE); } + /** Returns true if an exception occured during {@link Callable} work execution. */ + public final boolean hasError() { return null != workErr; } + /** Returns the worker thread if {@link #isRunning()}, otherwise {@code null}. */ + public final Thread getThread() { return thread; } + + /** + * Returns the exception is {@link #hasError()}. + * @param clear if true, clear the exception + */ + public final Exception getError(final boolean clear ) { final Exception e = workErr; if( clear) { workErr = null; } return e; } /** * Returns enforced minimum work-loop-period or {@link Duration#ZERO} for none. @@ -228,10 +290,11 @@ public class WorkerThread { @Override public String toString() { synchronized(this) { - return "Worker[running "+isRunning+", active "+isActive+", blocked "+isBlocked+ - ", shall[pause "+shallPause+", stop "+shallStop+ - "], minDelay "+minDelay.toMillis()+"ms, minPeriod[set "+minPeriod.toMillis()+"ms, sleptDelta "+sleptDuration.toMillis()+ - "ms], daemon "+isDaemonThread+", thread "+thread+"]"; + final int _state = state; + return "Worker[running "+isSet(_state, RUNNING)+", active "+isSet(_state, ACTIVE)+", blocked "+isSet(_state, BLOCKED)+ + ", shall[pause "+isSet(_state, SHALL_PAUSE)+", stop "+isSet(_state, SHALL_STOP)+ + "], min[period "+minPeriod.toMillis()+"ms, delay "+minDelay.toMillis()+"], slept "+sleptDuration.toMillis()+ + "ms, daemon "+isSet(_state, DAEMON)+", thread "+thread+"]"; } } @@ -242,41 +305,90 @@ public class WorkerThread { ct.setName(ct.getName()+"-Worker_"+instanceId.getAndIncrement()); synchronized ( WorkerThread.this ) { - if( null != cbInitLocked ) { - cbInitLocked.run(); + Exception err = null; + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.INIT); + } catch (final InterruptedException e) { + // OK + } catch (final Throwable t) { + err = new Exception(t.getClass().getSimpleName()+" while processing init-state "+cbState, t); + } + if( null != err ) { + workErr = err; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); + thread = null; + workErr.printStackTrace(); + WorkerThread.this.notifyAll(); // wake-up ctor() + return; // bail out + } } - isRunning = true; + set(RUNNING | ACTIVE); WorkerThread.this.notifyAll(); // wake-up ctor() } - while( !shallStop ) { - Exception streamErr = null; + while( !isSet(SHALL_STOP) ) { + Exception err = null; try { - if( shallPause ) { + if( isSet(SHALL_PAUSE) ) { synchronized ( WorkerThread.this ) { - while( shallPause && !shallStop ) { - isActive = false; + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.PAUSED); + } catch (final InterruptedException e) { + // OK + } catch (final Throwable t) { + err = new Exception(t.getClass().getSimpleName()+" while processing pause-state "+cbState, t); + } + if( null != err ) { + workErr = err; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); + thread = null; + workErr.printStackTrace(); + WorkerThread.this.notifyAll(); // wake-up ctor() + return; // bail out + } + } + while( isSet(SHALL_PAUSE) && !isSet(SHALL_STOP) ) { + clear(ACTIVE); WorkerThread.this.notifyAll(); // wake-up doPause() try { WorkerThread.this.wait(); // wait until resumed } catch (final InterruptedException e) { - if( !shallPause ) { + if( !isSet(SHALL_PAUSE) ) { throw new InterruptedRuntimeException(e); } } } - isActive = true; + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.RESUMED); + } catch (final InterruptedException e) { + err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing resume-state"+cbState, e); + } catch (final Throwable t) { + err = new Exception(t.getClass().getSimpleName()+" while processing resume-state "+cbState, t); + } + if( null != err ) { + workErr = err; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); + thread = null; + workErr.printStackTrace(); + WorkerThread.this.notifyAll(); // wake-up ctor() + return; // bail out + } + } + set(ACTIVE); WorkerThread.this.notifyAll(); // wake-up doResume() } } - if( !shallStop ) { + if( !isSet(SHALL_STOP) ) { final Instant t0 = Instant.now(); - isBlocked = true; + set(BLOCKED); { - cbWork.run(); + cbWork.run(WorkerThread.this); } - isBlocked = false; - if( useMinimum ) { + clear(BLOCKED); + if( isSet(USE_MINIMUM) ) { final long minDelayMS = minDelay.toMillis(); final Instant t1 = Instant.now(); final Duration td = Duration.between(t0, t1); @@ -303,32 +415,40 @@ public class WorkerThread { } } } catch (final InterruptedException e) { - if( !isBlocked ) { // !shallStop && !shallPause - streamErr = new InterruptedRuntimeException(e); + if( !isSet(BLOCKED) ) { // !shallStop && !shallPause + err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing work-callback "+cbWork, e); } - isBlocked = false; + clear(BLOCKED); sleptDuration = Duration.ZERO; } catch (final Throwable t) { - streamErr = new Exception(t.getClass().getSimpleName()+" while processing", t); + err = new Exception(t.getClass().getSimpleName()+" while processing work-callback "+cbWork, t); sleptDuration = Duration.ZERO; } finally { - if( null != streamErr ) { + if( null != err ) { // state transition incl. notification synchronized ( WorkerThread.this ) { - shallPause = true; - isActive = false; + workErr = err; + err = null; + set(SHALL_PAUSE); + clear(ACTIVE); WorkerThread.this.notifyAll(); // wake-up potential do*() } - pause(false); } } } synchronized ( WorkerThread.this ) { - if( null != cbEndLocked ) { - cbEndLocked.run(); + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.END); + } catch (final InterruptedException e) { + // OK + } catch (final Throwable t) { + workErr = new Exception(t.getClass().getSimpleName()+" while processing end-state "+cbState, t); + workErr.printStackTrace(); + } } - isRunning = false; - isActive = false; + thread = null; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); WorkerThread.this.notifyAll(); // wake-up doStop() } } }; |