aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/java/com/jogamp/common/util/WorkerThread.java314
-rw-r--r--src/junit/com/jogamp/common/util/TestWorkerThread01.java425
2 files changed, 605 insertions, 134 deletions
diff --git a/src/java/com/jogamp/common/util/WorkerThread.java b/src/java/com/jogamp/common/util/WorkerThread.java
index 52182b6..42466da 100644
--- a/src/java/com/jogamp/common/util/WorkerThread.java
+++ b/src/java/com/jogamp/common/util/WorkerThread.java
@@ -33,33 +33,77 @@ import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link Thread}
- * with an optional minimum execution duration, see {@link #getSleptDuration()}.
+ * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link #getThread() thread}
+ * with an optional minimum execution duration, see {@link #getSleptDuration()}
+ * executing a {@link Callback#run(WorkerThread) task} periodically.
+ * <p>
+ * Optionally a {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task}
+ * can be given for fine grained control.
+ * </p>
+ * <p>
+ * If an exception occurs during execution of the work {@link Callback}, the worker {@link #getThread() thread} is {@link #pause(boolean)}'ed
+ * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state.
+ * User may {@link #resume()} or {@link #stop()} the thread.
+ * </p>
+ * <p>
+ * If an exception occurs during execution of the optional
+ * {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task},
+ * the worker {@link #getThread() thread} is {@link #stop()}'ed
+ * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state.
+ * </p>
*/
public class WorkerThread {
/**
- * An interruptible {@link #run()} task.
+ * An interruptible {@link #run() task} periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
*/
public interface Callback {
- void run() throws InterruptedException;
+ /**
+ * Task to be periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
+ * @param self The {@link WorkerThread} manager
+ * @throws InterruptedException
+ */
+ void run(WorkerThread self) throws InterruptedException;
}
+ /**
+ * An interruptible {@link State} {@link #run() task} on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
+ */
+ public interface StateCallback {
+ /** State change cause. */
+ public static enum State {
+ INIT, PAUSED, RESUMED, END
+ }
+ /**
+ * Task to be executed on {@link State} change on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
+ * @param self The {@link WorkerThread} manager
+ * @param cause the {@link State} change cause
+ * @throws InterruptedException
+ */
+ void run(WorkerThread self, State cause) throws InterruptedException;
+ }
+
+ private static final int RUNNING = 1 << 0;
+ private static final int ACTIVE = 1 << 1;
+ private static final int BLOCKED = 1 << 2;
+ private static final int SHALL_PAUSE = 1 << 3;
+ private static final int SHALL_STOP = 1 << 4;
+ private static final int USE_MINIMUM = 1 << 5;
+ private static final int DAEMON = 1 << 6;
private static AtomicInteger instanceId = new AtomicInteger(0);
- private volatile boolean isRunning = false;
- private volatile boolean isActive = false;
- private volatile boolean isBlocked = false;
- private volatile boolean shallPause = true;
- private volatile boolean shallStop = false;
+ private volatile int state;
+ private final static boolean isSet(final int state, final int mask) { return mask == ( state & mask ); }
+ private final boolean isSet(final int mask) { return mask == ( state & mask ); }
+ private final void set(final int mask) { state |= mask; }
+ private final void clear(final int mask) { state &= ~mask; }
+
private final Duration minPeriod;
private final Duration minDelay;
- private final boolean useMinimum;
private final Callback cbWork;
- private final Runnable cbInitLocked;
- private final Runnable cbEndLocked;
- private final boolean isDaemonThread;
+ private final StateCallback cbState;
private Thread thread;
private volatile Duration sleptDuration = Duration.ZERO;
+ private volatile Exception workErr = null;
/**
* Instantiates a new {@link WorkerThread}.
@@ -69,7 +113,7 @@ public class WorkerThread {
* @param work the actual work {@link Callback} to perform.
*/
public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work) {
- this(minPeriod, minDelay, daemonThread, work, null, null);
+ this(minPeriod, minDelay, daemonThread, work, null);
}
/**
@@ -78,98 +122,104 @@ public class WorkerThread {
* @param minDelay minimum work-loop-delay to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()}
* @param daemonThread argument for {@link Thread#setDaemon(boolean)}
* @param work the actual work {@link Callback} to perform.
- * @param init optional initialization {@link Runnable} called at {@link #start()} while locked
- * @param end optional release {@link Runnable} called at {@link #stop()} while locked
+ * @param stateChangeCB optional {@link StateCallback} called at different {@link StateCallback.State} changes while locked
*/
- public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final Runnable init, final Runnable end) {
+ public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB) {
+ this.state = 0;
this.minPeriod = null != minPeriod ? minPeriod : Duration.ZERO;
this.minDelay = null != minDelay ? minDelay : Duration.ZERO;
- this.useMinimum = this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0;
+ if( this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0 ) {
+ set(USE_MINIMUM);
+ }
this.cbWork = work;
- this.cbInitLocked = init;
- this.cbEndLocked = end;
- this.isDaemonThread = daemonThread;
+ this.cbState = stateChangeCB;
+ if( daemonThread ) {
+ set(DAEMON);
+ }
thread = null;
}
/**
* Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed.
* <p>
- * Method blocks until the new worker thread has started, {@link #isRunning()} and also {@link #isActive()}
- * </p>
- */
- public final synchronized void start() {
- start(false);
- }
-
- /**
- * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed.
- * <p>
* Method blocks until the new worker thread has been started and {@link #isRunning()} and {@link #isActive()} if {@code paused == false}.
* </p>
* @param paused if {@code true}, keeps the new worker thread paused, otherwise {@link #resume()} it.
*/
public final synchronized void start(final boolean paused) {
- if( isRunning ) {
+ if( isSet(RUNNING) || null != thread || isSet(SHALL_STOP) || isSet(SHALL_PAUSE) ) {
+ // definite start condition: !isRunning
+ // subsequent conditions only for consistency/doc: null == thread && !shallStop && !shallPause
return;
}
- shallStop = false;
- shallPause = true;
+ if( paused ) {
+ set(SHALL_PAUSE);
+ }
thread = new Thread(threadRunnable);
- thread.setDaemon(isDaemonThread);
+ thread.setDaemon(isSet(DAEMON));
thread.start();
try {
this.notifyAll(); // wake-up startup-block
- while( !isRunning && !shallStop ) {
- this.wait(); // wait until started
+ if( !paused ) {
+ while( !isSet(RUNNING) && !isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) {
+ this.wait(); // wait until started and active (not-paused)
+ }
+ } else {
+ while( !isSet(RUNNING) && null != thread && !isSet(SHALL_STOP) ) {
+ this.wait(); // wait until started
+ }
+ while( isSet(RUNNING) && isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) {
+ this.wait(); // wait until paused
+ }
}
} catch (final InterruptedException e) {
throw new InterruptedRuntimeException(e);
}
- if( !paused ) {
- resume();
- }
}
/**
* Stops execution of the {@link #start()}'ed worker thread.
* <p>
- * Method blocks until worker thread has stopped.
+ * Method blocks until worker thread has been {@link #isRunning() stopped} if {@code waitUntilDone} is {@code true}.
* </p>
*/
- public final synchronized void stop() {
- if( isRunning ) {
- shallStop = true;
+ public final synchronized void stop(final boolean waitUntilDone) {
+ if( isSet(RUNNING) ) {
+ set(SHALL_STOP);
+ this.notifyAll(); // wake-up pause-block (opt)
if( java.lang.Thread.currentThread() != thread ) {
- if( isBlocked && isRunning ) {
+ if( isSet(BLOCKED | RUNNING) ) {
thread.interrupt();
}
- try {
- this.notifyAll(); // wake-up pause-block (opt)
- while( isRunning ) {
- this.wait(); // wait until stopped
+ if( waitUntilDone ) {
+ try {
+ while( isSet(RUNNING) ) {
+ this.wait(); // wait until stopped
+ }
+ } catch (final InterruptedException e) {
+ throw new InterruptedRuntimeException(e);
}
- } catch (final InterruptedException e) {
- throw new InterruptedRuntimeException(e);
}
}
- thread = null;
- shallStop = false;
- shallPause = true;
}
}
- /** Pauses execution of the {@link #start()}'ed worker thread. */
+ /**
+ * Pauses execution of the {@link #start()}'ed worker thread.
+ * <p>
+ * Method blocks until worker thread has been {@link #isActive()}'ated if {@code waitUntilDone} is {@code true}.
+ * </p>
+ */
public final synchronized void pause(final boolean waitUntilDone) {
- if( isActive ) {
- shallPause = true;
+ if( isSet(RUNNING | ACTIVE) && !isSet(SHALL_STOP) ) {
+ set(SHALL_PAUSE);
if( java.lang.Thread.currentThread() != thread ) {
- if( isBlocked && isActive ) {
+ if( isSet(BLOCKED | ACTIVE) ) {
thread.interrupt();
}
if( waitUntilDone ) {
try {
- while( isActive && isRunning ) {
+ while( isSet(RUNNING | ACTIVE) ) {
this.wait(); // wait until paused
}
} catch (final InterruptedException e) {
@@ -182,12 +232,12 @@ public class WorkerThread {
/** Resumes execution of the {@link #pause(boolean)}'ed worker thread. */
public final synchronized void resume() {
- if( isRunning && !isActive ) {
- shallPause = false;
+ if( isSet(RUNNING) && !isSet(ACTIVE) && !isSet(SHALL_STOP) ) {
+ clear(SHALL_PAUSE);
+ this.notifyAll(); // wake-up pause-block
if( java.lang.Thread.currentThread() != thread ) {
try {
- this.notifyAll(); // wake-up pause-block
- while( !isActive && !shallPause && isRunning ) {
+ while( !isSet(ACTIVE) && !isSet(SHALL_PAUSE) && isSet(RUNNING) ) {
this.wait(); // wait until resumed
}
} catch (final InterruptedException e) {
@@ -199,9 +249,21 @@ public class WorkerThread {
}
/** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()}. It might be {@link #pause(boolean) paused}. */
- public final boolean isRunning() { return isRunning; }
- /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()} and is not {@link #pause(boolean) paused}. */
- public final boolean isActive() { return isActive; }
+ public final boolean isRunning() { return isSet(RUNNING); }
+ /** Returns true if the worker thread {@link #isRunning()} and is not {@link #pause(boolean) paused}. */
+ public final boolean isActive() { return isSet(ACTIVE); }
+ /** Returns true if the worker thread {@link #isRunning()} and is {@link #pause(boolean) paused}. */
+ public final boolean isPaused() { return isSet(RUNNING) && !isSet(ACTIVE); }
+ /** Returns true if an exception occured during {@link Callable} work execution. */
+ public final boolean hasError() { return null != workErr; }
+ /** Returns the worker thread if {@link #isRunning()}, otherwise {@code null}. */
+ public final Thread getThread() { return thread; }
+
+ /**
+ * Returns the exception is {@link #hasError()}.
+ * @param clear if true, clear the exception
+ */
+ public final Exception getError(final boolean clear ) { final Exception e = workErr; if( clear) { workErr = null; } return e; }
/**
* Returns enforced minimum work-loop-period or {@link Duration#ZERO} for none.
@@ -228,10 +290,11 @@ public class WorkerThread {
@Override
public String toString() {
synchronized(this) {
- return "Worker[running "+isRunning+", active "+isActive+", blocked "+isBlocked+
- ", shall[pause "+shallPause+", stop "+shallStop+
- "], minDelay "+minDelay.toMillis()+"ms, minPeriod[set "+minPeriod.toMillis()+"ms, sleptDelta "+sleptDuration.toMillis()+
- "ms], daemon "+isDaemonThread+", thread "+thread+"]";
+ final int _state = state;
+ return "Worker[running "+isSet(_state, RUNNING)+", active "+isSet(_state, ACTIVE)+", blocked "+isSet(_state, BLOCKED)+
+ ", shall[pause "+isSet(_state, SHALL_PAUSE)+", stop "+isSet(_state, SHALL_STOP)+
+ "], min[period "+minPeriod.toMillis()+"ms, delay "+minDelay.toMillis()+"], slept "+sleptDuration.toMillis()+
+ "ms, daemon "+isSet(_state, DAEMON)+", thread "+thread+"]";
}
}
@@ -242,41 +305,90 @@ public class WorkerThread {
ct.setName(ct.getName()+"-Worker_"+instanceId.getAndIncrement());
synchronized ( WorkerThread.this ) {
- if( null != cbInitLocked ) {
- cbInitLocked.run();
+ Exception err = null;
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.INIT);
+ } catch (final InterruptedException e) {
+ // OK
+ } catch (final Throwable t) {
+ err = new Exception(t.getClass().getSimpleName()+" while processing init-state "+cbState, t);
+ }
+ if( null != err ) {
+ workErr = err;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
+ thread = null;
+ workErr.printStackTrace();
+ WorkerThread.this.notifyAll(); // wake-up ctor()
+ return; // bail out
+ }
}
- isRunning = true;
+ set(RUNNING | ACTIVE);
WorkerThread.this.notifyAll(); // wake-up ctor()
}
- while( !shallStop ) {
- Exception streamErr = null;
+ while( !isSet(SHALL_STOP) ) {
+ Exception err = null;
try {
- if( shallPause ) {
+ if( isSet(SHALL_PAUSE) ) {
synchronized ( WorkerThread.this ) {
- while( shallPause && !shallStop ) {
- isActive = false;
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.PAUSED);
+ } catch (final InterruptedException e) {
+ // OK
+ } catch (final Throwable t) {
+ err = new Exception(t.getClass().getSimpleName()+" while processing pause-state "+cbState, t);
+ }
+ if( null != err ) {
+ workErr = err;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
+ thread = null;
+ workErr.printStackTrace();
+ WorkerThread.this.notifyAll(); // wake-up ctor()
+ return; // bail out
+ }
+ }
+ while( isSet(SHALL_PAUSE) && !isSet(SHALL_STOP) ) {
+ clear(ACTIVE);
WorkerThread.this.notifyAll(); // wake-up doPause()
try {
WorkerThread.this.wait(); // wait until resumed
} catch (final InterruptedException e) {
- if( !shallPause ) {
+ if( !isSet(SHALL_PAUSE) ) {
throw new InterruptedRuntimeException(e);
}
}
}
- isActive = true;
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.RESUMED);
+ } catch (final InterruptedException e) {
+ err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing resume-state"+cbState, e);
+ } catch (final Throwable t) {
+ err = new Exception(t.getClass().getSimpleName()+" while processing resume-state "+cbState, t);
+ }
+ if( null != err ) {
+ workErr = err;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
+ thread = null;
+ workErr.printStackTrace();
+ WorkerThread.this.notifyAll(); // wake-up ctor()
+ return; // bail out
+ }
+ }
+ set(ACTIVE);
WorkerThread.this.notifyAll(); // wake-up doResume()
}
}
- if( !shallStop ) {
+ if( !isSet(SHALL_STOP) ) {
final Instant t0 = Instant.now();
- isBlocked = true;
+ set(BLOCKED);
{
- cbWork.run();
+ cbWork.run(WorkerThread.this);
}
- isBlocked = false;
- if( useMinimum ) {
+ clear(BLOCKED);
+ if( isSet(USE_MINIMUM) ) {
final long minDelayMS = minDelay.toMillis();
final Instant t1 = Instant.now();
final Duration td = Duration.between(t0, t1);
@@ -303,32 +415,40 @@ public class WorkerThread {
}
}
} catch (final InterruptedException e) {
- if( !isBlocked ) { // !shallStop && !shallPause
- streamErr = new InterruptedRuntimeException(e);
+ if( !isSet(BLOCKED) ) { // !shallStop && !shallPause
+ err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing work-callback "+cbWork, e);
}
- isBlocked = false;
+ clear(BLOCKED);
sleptDuration = Duration.ZERO;
} catch (final Throwable t) {
- streamErr = new Exception(t.getClass().getSimpleName()+" while processing", t);
+ err = new Exception(t.getClass().getSimpleName()+" while processing work-callback "+cbWork, t);
sleptDuration = Duration.ZERO;
} finally {
- if( null != streamErr ) {
+ if( null != err ) {
// state transition incl. notification
synchronized ( WorkerThread.this ) {
- shallPause = true;
- isActive = false;
+ workErr = err;
+ err = null;
+ set(SHALL_PAUSE);
+ clear(ACTIVE);
WorkerThread.this.notifyAll(); // wake-up potential do*()
}
- pause(false);
}
}
}
synchronized ( WorkerThread.this ) {
- if( null != cbEndLocked ) {
- cbEndLocked.run();
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.END);
+ } catch (final InterruptedException e) {
+ // OK
+ } catch (final Throwable t) {
+ workErr = new Exception(t.getClass().getSimpleName()+" while processing end-state "+cbState, t);
+ workErr.printStackTrace();
+ }
}
- isRunning = false;
- isActive = false;
+ thread = null;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
WorkerThread.this.notifyAll(); // wake-up doStop()
}
} };
diff --git a/src/junit/com/jogamp/common/util/TestWorkerThread01.java b/src/junit/com/jogamp/common/util/TestWorkerThread01.java
index afd41a1..4d7f27e 100644
--- a/src/junit/com/jogamp/common/util/TestWorkerThread01.java
+++ b/src/junit/com/jogamp/common/util/TestWorkerThread01.java
@@ -48,7 +48,7 @@ public class TestWorkerThread01 extends SingletonJunitCase {
static class Action implements WorkerThread.Callback {
final Duration sleep;
- AtomicInteger counter = new AtomicInteger(0);
+ final AtomicInteger counter = new AtomicInteger(0);
Instant tlast = Instant.now();
volatile Duration td = Duration.ZERO;
@@ -57,56 +57,107 @@ public class TestWorkerThread01 extends SingletonJunitCase {
}
@Override
- public void run() throws InterruptedException {
+ public void run(final WorkerThread self) throws InterruptedException {
{
java.lang.Thread.sleep(sleep.toMillis());
// java.util.concurrent.locks.LockSupport.parkNanos(sleep.toNanos());
}
final Instant t1 = Instant.now();
td = Duration.between(tlast, t1);
- System.err.println("action period "+td.toMillis()+"ms, counter "+counter.getAndIncrement());
+ final int v = counter.incrementAndGet();
+ // System.err.println("action period "+td.toMillis()+"ms, counter "+v+": "+self);
tlast = t1;
}
}
+ static class StateCB implements WorkerThread.StateCallback {
+ final AtomicInteger initCounter = new AtomicInteger(0);
+ final AtomicInteger pausedCounter = new AtomicInteger(0);
+ final AtomicInteger resumedCounter = new AtomicInteger(0);
+ final AtomicInteger endCounter = new AtomicInteger(0);
+
+ @Override
+ public void run(final WorkerThread self, final State cause) throws InterruptedException {
+ // System.err.println("WT-"+cause+": "+self);
+ switch( cause ) {
+ case END:
+ endCounter.incrementAndGet();
+ break;
+ case INIT:
+ initCounter.incrementAndGet();
+ break;
+ case PAUSED:
+ pausedCounter.incrementAndGet();
+ break;
+ case RESUMED:
+ resumedCounter.incrementAndGet();
+ break;
+ default:
+ break;
+ }
+ }
+ }
static void checkStarted(final WorkerThread wt, final boolean isPaused) {
- Assert.assertTrue(wt.isRunning());
- Assert.assertEquals(!isPaused, wt.isActive());
+ Assert.assertTrue(wt.toString(), wt.isRunning());
+ Assert.assertEquals("isPaused "+isPaused+", "+wt.toString(), !isPaused, wt.isActive());
+ Assert.assertNotNull(wt.toString(), wt.getThread());
}
static void checkStopped(final WorkerThread wt) {
- Assert.assertFalse(wt.isRunning());
- Assert.assertFalse(wt.isActive());
+ Assert.assertFalse(wt.toString(), wt.isRunning());
+ Assert.assertFalse(wt.toString(), wt.isActive());
+ Assert.assertNull(wt.toString(), wt.getThread());
}
- static void start(final WorkerThread wt) {
- System.err.println("WT Start.0: "+wt);
- wt.start();
- System.err.println("WT Start.X: "+wt);
+ static void start(final WorkerThread wt, final boolean paused) {
+ // System.err.println("WT Start.0: paused "+paused+", "+wt);
+ wt.start(paused);
+ // System.err.println("WT Start.X: "+wt);
}
- static void stop(final WorkerThread wt) {
- System.err.println("WT Stop.0: "+wt);
- wt.stop();
- System.err.println("WT Stop.X: "+wt);
+ static void stop(final WorkerThread wt, final boolean wait) {
+ // System.err.println("WT Stop.0: wait "+wait+", "+wt);
+ wt.stop(wait);
+ // System.err.println("WT Stop.X: wait "+wait+", "+wt);
}
static void pause(final WorkerThread wt, final boolean wait) {
- System.err.println("WT Pause.0: wait "+wait+", "+wt);
+ // System.err.println("WT Pause.0: wait "+wait+", "+wt);
wt.pause(wait);
- System.err.println("WT Pause.X: wait "+wait+", "+wt);
+ // System.err.println("WT Pause.X: wait "+wait+", "+wt);
}
static void resume(final WorkerThread wt) {
- System.err.println("WT Resume.0: "+wt);
+ // System.err.println("WT Resume.0: "+wt);
wt.resume();
- System.err.println("WT Resume.X: "+wt);
+ // System.err.println("WT Resume.X: "+wt);
}
- public void testAction(final long periodMS, final long minDelayMS, final long actionMS) throws IOException, InterruptedException, InvocationTargetException {
+ public void testAction(final boolean startPaused, final long periodMS, final long minDelayMS, final long actionMS) throws IOException, InterruptedException, InvocationTargetException {
final Action action = new Action( 0 < actionMS ? Duration.of(actionMS, ChronoUnit.MILLIS) : Duration.ZERO);
+ final StateCB stateCB = new StateCB();
final WorkerThread wt =new WorkerThread(Duration.of(periodMS, ChronoUnit.MILLIS),
- Duration.of(minDelayMS, ChronoUnit.MILLIS), true /* daemonThread */, action);
+ Duration.of(minDelayMS, ChronoUnit.MILLIS), true /* daemonThread */, action, stateCB);
+
final long maxPeriodMS = Math.max(minDelayMS+actionMS, Math.max(periodMS, actionMS));
+ System.err.println("testAction: startPaused "+startPaused+", maxPeriodMS "+maxPeriodMS+", actionMS "+actionMS+", "+wt);
+
int counterA = action.counter.get();
checkStopped(wt);
- start(wt);
- checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, action.counter.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ start(wt, startPaused);
+ checkStarted(wt, startPaused /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?1:0, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?0:0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ if( startPaused ) {
+ wt.resume();
+ checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ }
Thread.sleep(maxPeriodMS*3);
{
final Duration td = action.td;
@@ -118,65 +169,365 @@ public class TestWorkerThread01 extends SingletonJunitCase {
}
checkStarted(wt, false /* isPaused */);
- stop(wt);
+ stop(wt, true); // running -> stop
checkStopped(wt);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?1:0, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?1:0, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
int counterB = action.counter.get();
Assert.assertTrue(counterB > counterA);
counterA = action.counter.get();
checkStopped(wt);
- start(wt);
- checkStarted(wt, false /* isPaused */);
+ start(wt, startPaused); // stop -> running
+ checkStarted(wt, startPaused /* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?2:0, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?1:0, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+ if( startPaused ) {
+ wt.resume();
+ checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?2:0, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?2:0, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+ }
Thread.sleep(maxPeriodMS*3);
checkStarted(wt, false /* isPaused */);
- pause(wt, true /* wait */);
+ pause(wt, true /* wait */); // running -> pause
checkStarted(wt, true /* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?3:1, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?2:0, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
counterB = action.counter.get();
Assert.assertTrue(counterB > counterA);
+ Thread.sleep(maxPeriodMS);
counterA = action.counter.get();
Assert.assertTrue(counterB == counterA);
- Thread.sleep(maxPeriodMS);
- resume(wt);
+ resume(wt); // pause -> running
checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?3:1, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?3:1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
Thread.sleep(maxPeriodMS*3);
checkStarted(wt, false /* isPaused */);
- stop(wt);
- checkStopped(wt);
+ pause(wt, true /* wait */); // running -> pause
+ checkStarted(wt, true /* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?3:1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
counterB = action.counter.get();
Assert.assertTrue(counterB > counterA);
+ counterA = counterB;
+
+ checkStarted(wt, true /* isPaused */);
+ stop(wt, true); // pause -> stop
+ checkStopped(wt);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.resumedCounter.get());
+ Assert.assertEquals(2, stateCB.endCounter.get());
+ counterB = action.counter.get();
+ Assert.assertTrue(counterB == counterA);
+
+ resume(wt); // stop -> stop
+ checkStopped(wt);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.resumedCounter.get());
+ Assert.assertEquals(2, stateCB.endCounter.get());
+
+ pause(wt, true /* wait */); // stop -> stop
+ checkStopped(wt);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.pausedCounter.get());
+ Assert.assertEquals(startPaused?4:2, stateCB.resumedCounter.get());
+ Assert.assertEquals(2, stateCB.endCounter.get());
}
@Test
public void test01ZeroAction() throws IOException, InterruptedException, InvocationTargetException {
- testAction(16 /* periodMS */, 0 /* minDelayMS */, 0 /* actionMS*/);
+ testAction(false, 16 /* periodMS */, 0 /* minDelayMS */, 0 /* actionMS*/);
+ testAction(true, 16 /* periodMS */, 0 /* minDelayMS */, 0 /* actionMS*/);
}
@Test
public void test02MidAction() throws IOException, InterruptedException, InvocationTargetException {
- testAction(16 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(false, 16 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(true, 16 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/);
}
@Test
public void test03HeavyAction() throws IOException, InterruptedException, InvocationTargetException {
- testAction(16 /* periodMS */, 0 /* minDelayMS */, 20 /* actionMS*/);
+ testAction(false, 16 /* periodMS */, 0 /* minDelayMS */, 20 /* actionMS*/);
+ testAction(true, 16 /* periodMS */, 0 /* minDelayMS */, 20 /* actionMS*/);
}
@Test
public void test03ZeroMidAction() throws IOException, InterruptedException, InvocationTargetException {
- testAction(0 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(false, 0 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(true, 0 /* periodMS */, 0 /* minDelayMS */, 8 /* actionMS*/);
}
@Test
public void test04ZeroMinDelayMidAction() throws IOException, InterruptedException, InvocationTargetException {
- testAction(0 /* periodMS */, 4 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(false, 0 /* periodMS */, 4 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(true, 0 /* periodMS */, 4 /* minDelayMS */, 8 /* actionMS*/);
}
@Test
public void test05MinDelayMidAction() throws IOException, InterruptedException, InvocationTargetException {
- testAction(8 /* periodMS */, 8 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(false, 8 /* periodMS */, 8 /* minDelayMS */, 8 /* actionMS*/);
+ testAction(true, 8 /* periodMS */, 8 /* minDelayMS */, 8 /* actionMS*/);
+ }
+
+ @Test
+ public void test10InitEnd01() throws IOException, InterruptedException, InvocationTargetException {
+ // Issuing stop not in the worker-thread
+ final AtomicInteger actionLatch = new AtomicInteger(0);
+ final WorkerThread.Callback action = (final WorkerThread self) -> {
+ java.lang.Thread.sleep(1);
+ final boolean v = actionLatch.compareAndSet(0, 1);
+ // System.err.println("action set "+v+": "+self);
+ };
+ final StateCB stateCB = new StateCB();
+
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionLatch.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ final long minPeriodMS = 2;
+ final long maxPeriodMS = 4;
+ final WorkerThread wt =new WorkerThread(Duration.of(minPeriodMS, ChronoUnit.MILLIS),
+ Duration.of(0, ChronoUnit.MILLIS), true /* daemonThread */,
+ action, stateCB);
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionLatch.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ checkStopped(wt);
+
+ start(wt, true);
+ checkStarted(wt, true /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ wt.resume();
+ checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ // maybe: Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ Thread.sleep(maxPeriodMS);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ checkStarted(wt, false /* isPaused */);
+ stop(wt, true);
+ checkStopped(wt);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+
+ actionLatch.set(0);
+ Assert.assertEquals(0, actionLatch.get());
+ start(wt, false);
+ checkStarted(wt, false/* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ // maybe: Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+
+ Thread.sleep(maxPeriodMS);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+
+ checkStarted(wt, false /* isPaused */);
+ stop(wt, true);
+ checkStopped(wt);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(2, stateCB.endCounter.get());
+ }
+
+ @Test
+ public void test11InitEnd02() throws IOException, InterruptedException, InvocationTargetException {
+ // Issuing stop on the worker-thread
+ final AtomicInteger actionCounter = new AtomicInteger(0);
+ final WorkerThread.Callback action = (final WorkerThread self) -> {
+ java.lang.Thread.sleep(1);
+ final int v = actionCounter.incrementAndGet();
+ // System.err.println("action cntr "+v+": "+self);
+ if( 8 == v ) {
+ stop(self, true);
+ }
+ };
+ final StateCB stateCB = new StateCB();
+
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionCounter.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ final long minPeriodMS = 2;
+ final long maxPeriodMS = 16;
+ final WorkerThread wt =new WorkerThread(Duration.of(minPeriodMS, ChronoUnit.MILLIS),
+ Duration.of(0, ChronoUnit.MILLIS), true /* daemonThread */,
+ action, stateCB);
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionCounter.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ checkStopped(wt);
+
+ start(wt, true);
+ checkStarted(wt, true /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionCounter.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ wt.resume();
+ checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ // maybe Assert.assertEquals(1, actionCounter.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ Thread.sleep(maxPeriodMS);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertTrue(0 < actionCounter.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+ checkStopped(wt);
+
+ actionCounter.set(0);
+ Assert.assertEquals(0, actionCounter.get());
+ start(wt, false);
+ checkStarted(wt, false/* isPaused */);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ // maybe: Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
+
+ Thread.sleep(maxPeriodMS);
+ Assert.assertEquals(2, stateCB.initCounter.get());
+ Assert.assertTrue(0 < actionCounter.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(2, stateCB.endCounter.get());
+ checkStopped(wt);
+ }
+
+ @Test
+ public void test20ExceptionAtWork() throws IOException, InterruptedException, InvocationTargetException {
+ final AtomicInteger actionCounter = new AtomicInteger(0);
+ final WorkerThread.Callback action = (final WorkerThread self) -> {
+ java.lang.Thread.sleep(1);
+ final int v = actionCounter.incrementAndGet();
+ // System.err.println("action cntr "+v+": "+self);
+ if( 8 == v ) {
+ throw new RuntimeException("Test exception from worker action: "+self);
+ }
+ };
+ final StateCB stateCB = new StateCB();
+
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionCounter.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ final long minPeriodMS = 2;
+ final long maxPeriodMS = 16;
+ final WorkerThread wt =new WorkerThread(Duration.of(minPeriodMS, ChronoUnit.MILLIS),
+ Duration.of(0, ChronoUnit.MILLIS), true /* daemonThread */,
+ action, stateCB);
+ Assert.assertEquals(0, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionCounter.get());
+ Assert.assertEquals(0, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ checkStopped(wt);
+
+ start(wt, true);
+ checkStarted(wt, true /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertEquals(0, actionCounter.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(0, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ wt.resume();
+ checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ // maybe: Assert.assertEquals(1, actionLatch.get());
+ Assert.assertEquals(1, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+
+ Thread.sleep(maxPeriodMS);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertTrue(0 < actionCounter.get());
+ Assert.assertEquals(2, stateCB.pausedCounter.get());
+ Assert.assertEquals(1, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ checkStarted(wt, true /* isPaused */);
+ Assert.assertTrue(wt.hasError());
+ Assert.assertNotNull(wt.getError(true));
+ final int counterA = actionCounter.get();
+
+ wt.resume();
+ checkStarted(wt, false /* isPaused */);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertTrue(0 < actionCounter.get());
+ Assert.assertEquals(2, stateCB.pausedCounter.get());
+ Assert.assertEquals(2, stateCB.resumedCounter.get());
+ Assert.assertEquals(0, stateCB.endCounter.get());
+ Thread.sleep(maxPeriodMS);
+
+ stop(wt, true);
+ checkStopped(wt);
+ final int counterB = actionCounter.get();
+ Assert.assertTrue(counterB > counterA);
+ Assert.assertEquals(1, stateCB.initCounter.get());
+ Assert.assertTrue(0 < actionCounter.get());
+ Assert.assertEquals(2, stateCB.pausedCounter.get());
+ Assert.assertEquals(2, stateCB.resumedCounter.get());
+ Assert.assertEquals(1, stateCB.endCounter.get());
}
public static void main(final String args[]) throws IOException {