From fdda2f12a5aa6f931f693b15e1c3cd7dab030882 Mon Sep 17 00:00:00 2001 From: Sven Gothel Date: Fri, 22 Sep 2023 16:06:42 +0200 Subject: WorkerThread: Enhanced testing, added optional StateCallback for state changes, using bitfield state (earmarked to be used within GLMediaPlayerImpl etc) --- src/java/com/jogamp/common/util/WorkerThread.java | 314 ++++++++++----- .../com/jogamp/common/util/TestWorkerThread01.java | 425 +++++++++++++++++++-- 2 files changed, 605 insertions(+), 134 deletions(-) diff --git a/src/java/com/jogamp/common/util/WorkerThread.java b/src/java/com/jogamp/common/util/WorkerThread.java index 52182b6..42466da 100644 --- a/src/java/com/jogamp/common/util/WorkerThread.java +++ b/src/java/com/jogamp/common/util/WorkerThread.java @@ -33,33 +33,77 @@ import java.time.temporal.ChronoUnit; import java.util.concurrent.atomic.AtomicInteger; /** - * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link Thread} - * with an optional minimum execution duration, see {@link #getSleptDuration()}. + * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link #getThread() thread} + * with an optional minimum execution duration, see {@link #getSleptDuration()} + * executing a {@link Callback#run(WorkerThread) task} periodically. + *

+ * Optionally a {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task} + * can be given for fine grained control. + *

+ *

+ * If an exception occurs during execution of the work {@link Callback}, the worker {@link #getThread() thread} is {@link #pause(boolean)}'ed + * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state. + * User may {@link #resume()} or {@link #stop()} the thread. + *

+ *

+ * If an exception occurs during execution of the optional + * {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task}, + * the worker {@link #getThread() thread} is {@link #stop()}'ed + * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state. + *

*/ public class WorkerThread { /** - * An interruptible {@link #run()} task. + * An interruptible {@link #run() task} periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. */ public interface Callback { - void run() throws InterruptedException; + /** + * Task to be periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. + * @param self The {@link WorkerThread} manager + * @throws InterruptedException + */ + void run(WorkerThread self) throws InterruptedException; } + /** + * An interruptible {@link State} {@link #run() task} on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. + */ + public interface StateCallback { + /** State change cause. */ + public static enum State { + INIT, PAUSED, RESUMED, END + } + /** + * Task to be executed on {@link State} change on the {@link WorkerThread} {@link WorkerThread#getThread() thread}. + * @param self The {@link WorkerThread} manager + * @param cause the {@link State} change cause + * @throws InterruptedException + */ + void run(WorkerThread self, State cause) throws InterruptedException; + } + + private static final int RUNNING = 1 << 0; + private static final int ACTIVE = 1 << 1; + private static final int BLOCKED = 1 << 2; + private static final int SHALL_PAUSE = 1 << 3; + private static final int SHALL_STOP = 1 << 4; + private static final int USE_MINIMUM = 1 << 5; + private static final int DAEMON = 1 << 6; private static AtomicInteger instanceId = new AtomicInteger(0); - private volatile boolean isRunning = false; - private volatile boolean isActive = false; - private volatile boolean isBlocked = false; - private volatile boolean shallPause = true; - private volatile boolean shallStop = false; + private volatile int state; + private final static boolean isSet(final int state, final int mask) { return mask == ( state & mask ); } + private final boolean isSet(final int mask) { return mask == ( state & mask ); } + private final void set(final int mask) { state |= mask; } + private final void clear(final int mask) { state &= ~mask; } + private final Duration minPeriod; private final Duration minDelay; - private final boolean useMinimum; private final Callback cbWork; - private final Runnable cbInitLocked; - private final Runnable cbEndLocked; - private final boolean isDaemonThread; + private final StateCallback cbState; private Thread thread; private volatile Duration sleptDuration = Duration.ZERO; + private volatile Exception workErr = null; /** * Instantiates a new {@link WorkerThread}. @@ -69,7 +113,7 @@ public class WorkerThread { * @param work the actual work {@link Callback} to perform. */ public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work) { - this(minPeriod, minDelay, daemonThread, work, null, null); + this(minPeriod, minDelay, daemonThread, work, null); } /** @@ -78,30 +122,23 @@ public class WorkerThread { * @param minDelay minimum work-loop-delay to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()} * @param daemonThread argument for {@link Thread#setDaemon(boolean)} * @param work the actual work {@link Callback} to perform. - * @param init optional initialization {@link Runnable} called at {@link #start()} while locked - * @param end optional release {@link Runnable} called at {@link #stop()} while locked + * @param stateChangeCB optional {@link StateCallback} called at different {@link StateCallback.State} changes while locked */ - public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final Runnable init, final Runnable end) { + public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB) { + this.state = 0; this.minPeriod = null != minPeriod ? minPeriod : Duration.ZERO; this.minDelay = null != minDelay ? minDelay : Duration.ZERO; - this.useMinimum = this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0; + if( this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0 ) { + set(USE_MINIMUM); + } this.cbWork = work; - this.cbInitLocked = init; - this.cbEndLocked = end; - this.isDaemonThread = daemonThread; + this.cbState = stateChangeCB; + if( daemonThread ) { + set(DAEMON); + } thread = null; } - /** - * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed. - *

- * Method blocks until the new worker thread has started, {@link #isRunning()} and also {@link #isActive()} - *

- */ - public final synchronized void start() { - start(false); - } - /** * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed. *

@@ -110,66 +147,79 @@ public class WorkerThread { * @param paused if {@code true}, keeps the new worker thread paused, otherwise {@link #resume()} it. */ public final synchronized void start(final boolean paused) { - if( isRunning ) { + if( isSet(RUNNING) || null != thread || isSet(SHALL_STOP) || isSet(SHALL_PAUSE) ) { + // definite start condition: !isRunning + // subsequent conditions only for consistency/doc: null == thread && !shallStop && !shallPause return; } - shallStop = false; - shallPause = true; + if( paused ) { + set(SHALL_PAUSE); + } thread = new Thread(threadRunnable); - thread.setDaemon(isDaemonThread); + thread.setDaemon(isSet(DAEMON)); thread.start(); try { this.notifyAll(); // wake-up startup-block - while( !isRunning && !shallStop ) { - this.wait(); // wait until started + if( !paused ) { + while( !isSet(RUNNING) && !isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) { + this.wait(); // wait until started and active (not-paused) + } + } else { + while( !isSet(RUNNING) && null != thread && !isSet(SHALL_STOP) ) { + this.wait(); // wait until started + } + while( isSet(RUNNING) && isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) { + this.wait(); // wait until paused + } } } catch (final InterruptedException e) { throw new InterruptedRuntimeException(e); } - if( !paused ) { - resume(); - } } /** * Stops execution of the {@link #start()}'ed worker thread. *

- * Method blocks until worker thread has stopped. + * Method blocks until worker thread has been {@link #isRunning() stopped} if {@code waitUntilDone} is {@code true}. *

*/ - public final synchronized void stop() { - if( isRunning ) { - shallStop = true; + public final synchronized void stop(final boolean waitUntilDone) { + if( isSet(RUNNING) ) { + set(SHALL_STOP); + this.notifyAll(); // wake-up pause-block (opt) if( java.lang.Thread.currentThread() != thread ) { - if( isBlocked && isRunning ) { + if( isSet(BLOCKED | RUNNING) ) { thread.interrupt(); } - try { - this.notifyAll(); // wake-up pause-block (opt) - while( isRunning ) { - this.wait(); // wait until stopped + if( waitUntilDone ) { + try { + while( isSet(RUNNING) ) { + this.wait(); // wait until stopped + } + } catch (final InterruptedException e) { + throw new InterruptedRuntimeException(e); } - } catch (final InterruptedException e) { - throw new InterruptedRuntimeException(e); } } - thread = null; - shallStop = false; - shallPause = true; } } - /** Pauses execution of the {@link #start()}'ed worker thread. */ + /** + * Pauses execution of the {@link #start()}'ed worker thread. + *

+ * Method blocks until worker thread has been {@link #isActive()}'ated if {@code waitUntilDone} is {@code true}. + *

+ */ public final synchronized void pause(final boolean waitUntilDone) { - if( isActive ) { - shallPause = true; + if( isSet(RUNNING | ACTIVE) && !isSet(SHALL_STOP) ) { + set(SHALL_PAUSE); if( java.lang.Thread.currentThread() != thread ) { - if( isBlocked && isActive ) { + if( isSet(BLOCKED | ACTIVE) ) { thread.interrupt(); } if( waitUntilDone ) { try { - while( isActive && isRunning ) { + while( isSet(RUNNING | ACTIVE) ) { this.wait(); // wait until paused } } catch (final InterruptedException e) { @@ -182,12 +232,12 @@ public class WorkerThread { /** Resumes execution of the {@link #pause(boolean)}'ed worker thread. */ public final synchronized void resume() { - if( isRunning && !isActive ) { - shallPause = false; + if( isSet(RUNNING) && !isSet(ACTIVE) && !isSet(SHALL_STOP) ) { + clear(SHALL_PAUSE); + this.notifyAll(); // wake-up pause-block if( java.lang.Thread.currentThread() != thread ) { try { - this.notifyAll(); // wake-up pause-block - while( !isActive && !shallPause && isRunning ) { + while( !isSet(ACTIVE) && !isSet(SHALL_PAUSE) && isSet(RUNNING) ) { this.wait(); // wait until resumed } } catch (final InterruptedException e) { @@ -199,9 +249,21 @@ public class WorkerThread { } /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()}. It might be {@link #pause(boolean) paused}. */ - public final boolean isRunning() { return isRunning; } - /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()} and is not {@link #pause(boolean) paused}. */ - public final boolean isActive() { return isActive; } + public final boolean isRunning() { return isSet(RUNNING); } + /** Returns true if the worker thread {@link #isRunning()} and is not {@link #pause(boolean) paused}. */ + public final boolean isActive() { return isSet(ACTIVE); } + /** Returns true if the worker thread {@link #isRunning()} and is {@link #pause(boolean) paused}. */ + public final boolean isPaused() { return isSet(RUNNING) && !isSet(ACTIVE); } + /** Returns true if an exception occured during {@link Callable} work execution. */ + public final boolean hasError() { return null != workErr; } + /** Returns the worker thread if {@link #isRunning()}, otherwise {@code null}. */ + public final Thread getThread() { return thread; } + + /** + * Returns the exception is {@link #hasError()}. + * @param clear if true, clear the exception + */ + public final Exception getError(final boolean clear ) { final Exception e = workErr; if( clear) { workErr = null; } return e; } /** * Returns enforced minimum work-loop-period or {@link Duration#ZERO} for none. @@ -228,10 +290,11 @@ public class WorkerThread { @Override public String toString() { synchronized(this) { - return "Worker[running "+isRunning+", active "+isActive+", blocked "+isBlocked+ - ", shall[pause "+shallPause+", stop "+shallStop+ - "], minDelay "+minDelay.toMillis()+"ms, minPeriod[set "+minPeriod.toMillis()+"ms, sleptDelta "+sleptDuration.toMillis()+ - "ms], daemon "+isDaemonThread+", thread "+thread+"]"; + final int _state = state; + return "Worker[running "+isSet(_state, RUNNING)+", active "+isSet(_state, ACTIVE)+", blocked "+isSet(_state, BLOCKED)+ + ", shall[pause "+isSet(_state, SHALL_PAUSE)+", stop "+isSet(_state, SHALL_STOP)+ + "], min[period "+minPeriod.toMillis()+"ms, delay "+minDelay.toMillis()+"], slept "+sleptDuration.toMillis()+ + "ms, daemon "+isSet(_state, DAEMON)+", thread "+thread+"]"; } } @@ -242,41 +305,90 @@ public class WorkerThread { ct.setName(ct.getName()+"-Worker_"+instanceId.getAndIncrement()); synchronized ( WorkerThread.this ) { - if( null != cbInitLocked ) { - cbInitLocked.run(); + Exception err = null; + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.INIT); + } catch (final InterruptedException e) { + // OK + } catch (final Throwable t) { + err = new Exception(t.getClass().getSimpleName()+" while processing init-state "+cbState, t); + } + if( null != err ) { + workErr = err; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); + thread = null; + workErr.printStackTrace(); + WorkerThread.this.notifyAll(); // wake-up ctor() + return; // bail out + } } - isRunning = true; + set(RUNNING | ACTIVE); WorkerThread.this.notifyAll(); // wake-up ctor() } - while( !shallStop ) { - Exception streamErr = null; + while( !isSet(SHALL_STOP) ) { + Exception err = null; try { - if( shallPause ) { + if( isSet(SHALL_PAUSE) ) { synchronized ( WorkerThread.this ) { - while( shallPause && !shallStop ) { - isActive = false; + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.PAUSED); + } catch (final InterruptedException e) { + // OK + } catch (final Throwable t) { + err = new Exception(t.getClass().getSimpleName()+" while processing pause-state "+cbState, t); + } + if( null != err ) { + workErr = err; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); + thread = null; + workErr.printStackTrace(); + WorkerThread.this.notifyAll(); // wake-up ctor() + return; // bail out + } + } + while( isSet(SHALL_PAUSE) && !isSet(SHALL_STOP) ) { + clear(ACTIVE); WorkerThread.this.notifyAll(); // wake-up doPause() try { WorkerThread.this.wait(); // wait until resumed } catch (final InterruptedException e) { - if( !shallPause ) { + if( !isSet(SHALL_PAUSE) ) { throw new InterruptedRuntimeException(e); } } } - isActive = true; + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.RESUMED); + } catch (final InterruptedException e) { + err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing resume-state"+cbState, e); + } catch (final Throwable t) { + err = new Exception(t.getClass().getSimpleName()+" while processing resume-state "+cbState, t); + } + if( null != err ) { + workErr = err; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); + thread = null; + workErr.printStackTrace(); + WorkerThread.this.notifyAll(); // wake-up ctor() + return; // bail out + } + } + set(ACTIVE); WorkerThread.this.notifyAll(); // wake-up doResume() } } - if( !shallStop ) { + if( !isSet(SHALL_STOP) ) { final Instant t0 = Instant.now(); - isBlocked = true; + set(BLOCKED); { - cbWork.run(); + cbWork.run(WorkerThread.this); } - isBlocked = false; - if( useMinimum ) { + clear(BLOCKED); + if( isSet(USE_MINIMUM) ) { final long minDelayMS = minDelay.toMillis(); final Instant t1 = Instant.now(); final Duration td = Duration.between(t0, t1); @@ -303,32 +415,40 @@ public class WorkerThread { } } } catch (final InterruptedException e) { - if( !isBlocked ) { // !shallStop && !shallPause - streamErr = new InterruptedRuntimeException(e); + if( !isSet(BLOCKED) ) { // !shallStop && !shallPause + err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing work-callback "+cbWork, e); } - isBlocked = false; + clear(BLOCKED); sleptDuration = Duration.ZERO; } catch (final Throwable t) { - streamErr = new Exception(t.getClass().getSimpleName()+" while processing", t); + err = new Exception(t.getClass().getSimpleName()+" while processing work-callback "+cbWork, t); sleptDuration = Duration.ZERO; } finally { - if( null != streamErr ) { + if( null != err ) { // state transition incl. notification synchronized ( WorkerThread.this ) { - shallPause = true; - isActive = false; + workErr = err; + err = null; + set(SHALL_PAUSE); + clear(ACTIVE); WorkerThread.this.notifyAll(); // wake-up potential do*() } - pause(false); } } } synchronized ( WorkerThread.this ) { - if( null != cbEndLocked ) { - cbEndLocked.run(); + if( null != cbState ) { + try { + cbState.run(WorkerThread.this, StateCallback.State.END); + } catch (final InterruptedException e) { + // OK + } catch (final Throwable t) { + workErr = new Exception(t.getClass().getSimpleName()+" while processing end-state "+cbState, t); + workErr.printStackTrace(); + } } - isRunning = false; - isActive = false; + thread = null; + clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE); WorkerThread.this.notifyAll(); // wake-up doStop() } } }; diff --git a/src/junit/com/jogamp/common/util/TestWorkerThread01.java b/src/junit/com/jogamp/common/util/TestWorkerThread01.java index afd41a1..4d7f27e 100644 --- a/src/junit/com/jogamp/common/util/TestWorkerThread01.java +++ b/src/junit/com/jogamp/common/util/TestWorkerThread01.java @@ -48,7 +48,7 @@ public class TestWorkerThread01 extends SingletonJunitCase { static class Action implements WorkerThread.Callback { final Duration sleep; - AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger counter = new AtomicInteger(0); Instant tlast = Instant.now(); volatile Duration td = Duration.ZERO; @@ -57,56 +57,107 @@ public class TestWorkerThread01 extends SingletonJunitCase { } @Override - public void run() throws InterruptedException { + public void run(final WorkerThread self) throws InterruptedException { { java.lang.Thread.sleep(sleep.toMillis()); // java.util.concurrent.locks.LockSupport.parkNanos(sleep.toNanos()); } final Instant t1 = Instant.now(); td = Duration.between(tlast, t1); - System.err.println("action period "+td.toMillis()+"ms, counter "+counter.getAndIncrement()); + final int v = counter.incrementAndGet(); + // System.err.println("action period "+td.toMillis()+"ms, counter "+v+": "+self); tlast = t1; } } + static class StateCB implements WorkerThread.StateCallback { + final AtomicInteger initCounter = new AtomicInteger(0); + final AtomicInteger pausedCounter = new AtomicInteger(0); + final AtomicInteger resumedCounter = new AtomicInteger(0); + final AtomicInteger endCounter = new AtomicInteger(0); + + @Override + public void run(final WorkerThread self, final State cause) throws InterruptedException { + // System.err.println("WT-"+cause+": "+self); + switch( cause ) { + case END: + endCounter.incrementAndGet(); + break; + case INIT: + initCounter.incrementAndGet(); + break; + case PAUSED: + pausedCounter.incrementAndGet(); + break; + case RESUMED: + resumedCounter.incrementAndGet(); + break; + default: + break; + } + } + } static void checkStarted(final WorkerThread wt, final boolean isPaused) { - Assert.assertTrue(wt.isRunning()); - Assert.assertEquals(!isPaused, wt.isActive()); + Assert.assertTrue(wt.toString(), wt.isRunning()); + Assert.assertEquals("isPaused "+isPaused+", "+wt.toString(), !isPaused, wt.isActive()); + Assert.assertNotNull(wt.toString(), wt.getThread()); } static void checkStopped(final WorkerThread wt) { - Assert.assertFalse(wt.isRunning()); - Assert.assertFalse(wt.isActive()); + Assert.assertFalse(wt.toString(), wt.isRunning()); + Assert.assertFalse(wt.toString(), wt.isActive()); + Assert.assertNull(wt.toString(), wt.getThread()); } - static void start(final WorkerThread wt) { - System.err.println("WT Start.0: "+wt); - wt.start(); - System.err.println("WT Start.X: "+wt); + static void start(final WorkerThread wt, final boolean paused) { + // System.err.println("WT Start.0: paused "+paused+", "+wt); + wt.start(paused); + // System.err.println("WT Start.X: "+wt); } - static void stop(final WorkerThread wt) { - System.err.println("WT Stop.0: "+wt); - wt.stop(); - System.err.println("WT Stop.X: "+wt); + static void stop(final WorkerThread wt, final boolean wait) { + // System.err.println("WT Stop.0: wait "+wait+", "+wt); + wt.stop(wait); + // System.err.println("WT Stop.X: wait "+wait+", "+wt); } static void pause(final WorkerThread wt, final boolean wait) { - System.err.println("WT Pause.0: wait "+wait+", "+wt); + // System.err.println("WT Pause.0: wait "+wait+", "+wt); wt.pause(wait); - System.err.println("WT Pause.X: wait "+wait+", "+wt); + // System.err.println("WT Pause.X: wait "+wait+", "+wt); } static void resume(final WorkerThread wt) { - System.err.println("WT Resume.0: "+wt); + // System.err.println("WT Resume.0: "+wt); wt.resume(); - System.err.println("WT Resume.X: "+wt); + // System.err.println("WT Resume.X: "+wt); } - public void testAction(final long periodMS, final long minDelayMS, final long actionMS) throws IOException, InterruptedException, InvocationTargetException { + public void testAction(final boolean startPaused, final long periodMS, final long minDelayMS, final long actionMS) throws IOException, InterruptedException, InvocationTargetException { final Action action = new Action( 0 < actionMS ? Duration.of(actionMS, ChronoUnit.MILLIS) : Duration.ZERO); + final StateCB stateCB = new StateCB(); final WorkerThread wt =new WorkerThread(Duration.of(periodMS, ChronoUnit.MILLIS), - Duration.of(minDelayMS, ChronoUnit.MILLIS), true /* daemonThread */, action); + Duration.of(minDelayMS, ChronoUnit.MILLIS), true /* daemonThread */, action, stateCB); + final long maxPeriodMS = Math.max(minDelayMS+actionMS, Math.max(periodMS, actionMS)); + System.err.println("testAction: startPaused "+startPaused+", maxPeriodMS "+maxPeriodMS+", actionMS "+actionMS+", "+wt); + int counterA = action.counter.get(); checkStopped(wt); - start(wt); - checkStarted(wt, false /* isPaused */); + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, action.counter.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + start(wt, startPaused); + checkStarted(wt, startPaused /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?1:0, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?0:0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + if( startPaused ) { + wt.resume(); + checkStarted(wt, false /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + } Thread.sleep(maxPeriodMS*3); { final Duration td = action.td; @@ -118,65 +169,365 @@ public class TestWorkerThread01 extends SingletonJunitCase { } checkStarted(wt, false /* isPaused */); - stop(wt); + stop(wt, true); // running -> stop checkStopped(wt); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?1:0, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?1:0, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); int counterB = action.counter.get(); Assert.assertTrue(counterB > counterA); counterA = action.counter.get(); checkStopped(wt); - start(wt); - checkStarted(wt, false /* isPaused */); + start(wt, startPaused); // stop -> running + checkStarted(wt, startPaused /* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?2:0, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?1:0, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + if( startPaused ) { + wt.resume(); + checkStarted(wt, false /* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?2:0, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?2:0, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + } Thread.sleep(maxPeriodMS*3); checkStarted(wt, false /* isPaused */); - pause(wt, true /* wait */); + pause(wt, true /* wait */); // running -> pause checkStarted(wt, true /* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?3:1, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?2:0, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); counterB = action.counter.get(); Assert.assertTrue(counterB > counterA); + Thread.sleep(maxPeriodMS); counterA = action.counter.get(); Assert.assertTrue(counterB == counterA); - Thread.sleep(maxPeriodMS); - resume(wt); + resume(wt); // pause -> running checkStarted(wt, false /* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?3:1, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?3:1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); Thread.sleep(maxPeriodMS*3); checkStarted(wt, false /* isPaused */); - stop(wt); - checkStopped(wt); + pause(wt, true /* wait */); // running -> pause + checkStarted(wt, true /* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?3:1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); counterB = action.counter.get(); Assert.assertTrue(counterB > counterA); + counterA = counterB; + + checkStarted(wt, true /* isPaused */); + stop(wt, true); // pause -> stop + checkStopped(wt); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.resumedCounter.get()); + Assert.assertEquals(2, stateCB.endCounter.get()); + counterB = action.counter.get(); + Assert.assertTrue(counterB == counterA); + + resume(wt); // stop -> stop + checkStopped(wt); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.resumedCounter.get()); + Assert.assertEquals(2, stateCB.endCounter.get()); + + pause(wt, true /* wait */); // stop -> stop + checkStopped(wt); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get()); + Assert.assertEquals(startPaused?4:2, stateCB.resumedCounter.get()); + Assert.assertEquals(2, stateCB.endCounter.get()); } @Test public void test01ZeroAction() throws IOException, InterruptedException, InvocationTargetException { - testAction(16 /* periodMS */, 0 /* minDelayMS */, 0 /* actionMS*/); + testAction(false, 16 /* periodMS */, 0 /* minDelayMS */, 0 /* actionMS*/); + testAction(true, 16 /* periodMS */, 0 /* minDelayMS */, 0 /* actionMS*/); } @Test public void test02MidAction() throws IOException, InterruptedException, InvocationTargetException { - testAction(16 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/); + testAction(false, 16 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/); + testAction(true, 16 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/); } @Test public void test03HeavyAction() throws IOException, InterruptedException, InvocationTargetException { - testAction(16 /* periodMS */, 0 /* minDelayMS */, 20 /* actionMS*/); + testAction(false, 16 /* periodMS */, 0 /* minDelayMS */, 20 /* actionMS*/); + testAction(true, 16 /* periodMS */, 0 /* minDelayMS */, 20 /* actionMS*/); } @Test public void test03ZeroMidAction() throws IOException, InterruptedException, InvocationTargetException { - testAction(0 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/); + testAction(false, 0 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/); + testAction(true, 0 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/); } @Test public void test04ZeroMinDelayMidAction() throws IOException, InterruptedException, InvocationTargetException { - testAction(0 /* periodMS */, 4 /* minDelayMS */, 8 /* actionMS*/); + testAction(false, 0 /* periodMS */, 4 /* minDelayMS */, 8 /* actionMS*/); + testAction(true, 0 /* periodMS */, 4 /* minDelayMS */, 8 /* actionMS*/); } @Test public void test05MinDelayMidAction() throws IOException, InterruptedException, InvocationTargetException { - testAction(8 /* periodMS */, 8 /* minDelayMS */, 8 /* actionMS*/); + testAction(false, 8 /* periodMS */, 8 /* minDelayMS */, 8 /* actionMS*/); + testAction(true, 8 /* periodMS */, 8 /* minDelayMS */, 8 /* actionMS*/); + } + + @Test + public void test10InitEnd01() throws IOException, InterruptedException, InvocationTargetException { + // Issuing stop not in the worker-thread + final AtomicInteger actionLatch = new AtomicInteger(0); + final WorkerThread.Callback action = (final WorkerThread self) -> { + java.lang.Thread.sleep(1); + final boolean v = actionLatch.compareAndSet(0, 1); + // System.err.println("action set "+v+": "+self); + }; + final StateCB stateCB = new StateCB(); + + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, actionLatch.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + final long minPeriodMS = 2; + final long maxPeriodMS = 4; + final WorkerThread wt =new WorkerThread(Duration.of(minPeriodMS, ChronoUnit.MILLIS), + Duration.of(0, ChronoUnit.MILLIS), true /* daemonThread */, + action, stateCB); + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, actionLatch.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + checkStopped(wt); + + start(wt, true); + checkStarted(wt, true /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(0, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + wt.resume(); + checkStarted(wt, false /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + // maybe: Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + Thread.sleep(maxPeriodMS); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + checkStarted(wt, false /* isPaused */); + stop(wt, true); + checkStopped(wt); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + + actionLatch.set(0); + Assert.assertEquals(0, actionLatch.get()); + start(wt, false); + checkStarted(wt, false/* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + // maybe: Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + + Thread.sleep(maxPeriodMS); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + + checkStarted(wt, false /* isPaused */); + stop(wt, true); + checkStopped(wt); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(2, stateCB.endCounter.get()); + } + + @Test + public void test11InitEnd02() throws IOException, InterruptedException, InvocationTargetException { + // Issuing stop on the worker-thread + final AtomicInteger actionCounter = new AtomicInteger(0); + final WorkerThread.Callback action = (final WorkerThread self) -> { + java.lang.Thread.sleep(1); + final int v = actionCounter.incrementAndGet(); + // System.err.println("action cntr "+v+": "+self); + if( 8 == v ) { + stop(self, true); + } + }; + final StateCB stateCB = new StateCB(); + + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, actionCounter.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + final long minPeriodMS = 2; + final long maxPeriodMS = 16; + final WorkerThread wt =new WorkerThread(Duration.of(minPeriodMS, ChronoUnit.MILLIS), + Duration.of(0, ChronoUnit.MILLIS), true /* daemonThread */, + action, stateCB); + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, actionCounter.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + checkStopped(wt); + + start(wt, true); + checkStarted(wt, true /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(0, actionCounter.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + wt.resume(); + checkStarted(wt, false /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + // maybe Assert.assertEquals(1, actionCounter.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + Thread.sleep(maxPeriodMS); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertTrue(0 < actionCounter.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + checkStopped(wt); + + actionCounter.set(0); + Assert.assertEquals(0, actionCounter.get()); + start(wt, false); + checkStarted(wt, false/* isPaused */); + Assert.assertEquals(2, stateCB.initCounter.get()); + // maybe: Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); + + Thread.sleep(maxPeriodMS); + Assert.assertEquals(2, stateCB.initCounter.get()); + Assert.assertTrue(0 < actionCounter.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(2, stateCB.endCounter.get()); + checkStopped(wt); + } + + @Test + public void test20ExceptionAtWork() throws IOException, InterruptedException, InvocationTargetException { + final AtomicInteger actionCounter = new AtomicInteger(0); + final WorkerThread.Callback action = (final WorkerThread self) -> { + java.lang.Thread.sleep(1); + final int v = actionCounter.incrementAndGet(); + // System.err.println("action cntr "+v+": "+self); + if( 8 == v ) { + throw new RuntimeException("Test exception from worker action: "+self); + } + }; + final StateCB stateCB = new StateCB(); + + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, actionCounter.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + final long minPeriodMS = 2; + final long maxPeriodMS = 16; + final WorkerThread wt =new WorkerThread(Duration.of(minPeriodMS, ChronoUnit.MILLIS), + Duration.of(0, ChronoUnit.MILLIS), true /* daemonThread */, + action, stateCB); + Assert.assertEquals(0, stateCB.initCounter.get()); + Assert.assertEquals(0, actionCounter.get()); + Assert.assertEquals(0, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + checkStopped(wt); + + start(wt, true); + checkStarted(wt, true /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertEquals(0, actionCounter.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(0, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + wt.resume(); + checkStarted(wt, false /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + // maybe: Assert.assertEquals(1, actionLatch.get()); + Assert.assertEquals(1, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + + Thread.sleep(maxPeriodMS); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertTrue(0 < actionCounter.get()); + Assert.assertEquals(2, stateCB.pausedCounter.get()); + Assert.assertEquals(1, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + checkStarted(wt, true /* isPaused */); + Assert.assertTrue(wt.hasError()); + Assert.assertNotNull(wt.getError(true)); + final int counterA = actionCounter.get(); + + wt.resume(); + checkStarted(wt, false /* isPaused */); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertTrue(0 < actionCounter.get()); + Assert.assertEquals(2, stateCB.pausedCounter.get()); + Assert.assertEquals(2, stateCB.resumedCounter.get()); + Assert.assertEquals(0, stateCB.endCounter.get()); + Thread.sleep(maxPeriodMS); + + stop(wt, true); + checkStopped(wt); + final int counterB = actionCounter.get(); + Assert.assertTrue(counterB > counterA); + Assert.assertEquals(1, stateCB.initCounter.get()); + Assert.assertTrue(0 < actionCounter.get()); + Assert.assertEquals(2, stateCB.pausedCounter.get()); + Assert.assertEquals(2, stateCB.resumedCounter.get()); + Assert.assertEquals(1, stateCB.endCounter.get()); } public static void main(final String args[]) throws IOException { -- cgit v1.2.3