aboutsummaryrefslogtreecommitdiffstats
path: root/src/java/com
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2023-09-22 16:06:42 +0200
committerSven Gothel <[email protected]>2023-09-22 16:06:42 +0200
commitfdda2f12a5aa6f931f693b15e1c3cd7dab030882 (patch)
tree531c6a706922c1f8a64ce964767c332c21c49226 /src/java/com
parent7fe177c87184cbe9c170ed708a1db3deeb7e390c (diff)
WorkerThread: Enhanced testing, added optional StateCallback for state changes, using bitfield state (earmarked to be used within GLMediaPlayerImpl etc)
Diffstat (limited to 'src/java/com')
-rw-r--r--src/java/com/jogamp/common/util/WorkerThread.java314
1 files changed, 217 insertions, 97 deletions
diff --git a/src/java/com/jogamp/common/util/WorkerThread.java b/src/java/com/jogamp/common/util/WorkerThread.java
index 52182b6..42466da 100644
--- a/src/java/com/jogamp/common/util/WorkerThread.java
+++ b/src/java/com/jogamp/common/util/WorkerThread.java
@@ -33,33 +33,77 @@ import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link Thread}
- * with an optional minimum execution duration, see {@link #getSleptDuration()}.
+ * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link #getThread() thread}
+ * with an optional minimum execution duration, see {@link #getSleptDuration()}
+ * executing a {@link Callback#run(WorkerThread) task} periodically.
+ * <p>
+ * Optionally a {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task}
+ * can be given for fine grained control.
+ * </p>
+ * <p>
+ * If an exception occurs during execution of the work {@link Callback}, the worker {@link #getThread() thread} is {@link #pause(boolean)}'ed
+ * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state.
+ * User may {@link #resume()} or {@link #stop()} the thread.
+ * </p>
+ * <p>
+ * If an exception occurs during execution of the optional
+ * {@link WorkerThread.StateCallback.State state} {@link StateCallback#run(WorkerThread, WorkerThread.StateCallback.State) task},
+ * the worker {@link #getThread() thread} is {@link #stop()}'ed
+ * and {@link #hasError()} as well as {@link #getError(boolean)} can be used to query and clear the state.
+ * </p>
*/
public class WorkerThread {
/**
- * An interruptible {@link #run()} task.
+ * An interruptible {@link #run() task} periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
*/
public interface Callback {
- void run() throws InterruptedException;
+ /**
+ * Task to be periodically executed on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
+ * @param self The {@link WorkerThread} manager
+ * @throws InterruptedException
+ */
+ void run(WorkerThread self) throws InterruptedException;
}
+ /**
+ * An interruptible {@link State} {@link #run() task} on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
+ */
+ public interface StateCallback {
+ /** State change cause. */
+ public static enum State {
+ INIT, PAUSED, RESUMED, END
+ }
+ /**
+ * Task to be executed on {@link State} change on the {@link WorkerThread} {@link WorkerThread#getThread() thread}.
+ * @param self The {@link WorkerThread} manager
+ * @param cause the {@link State} change cause
+ * @throws InterruptedException
+ */
+ void run(WorkerThread self, State cause) throws InterruptedException;
+ }
+
+ private static final int RUNNING = 1 << 0;
+ private static final int ACTIVE = 1 << 1;
+ private static final int BLOCKED = 1 << 2;
+ private static final int SHALL_PAUSE = 1 << 3;
+ private static final int SHALL_STOP = 1 << 4;
+ private static final int USE_MINIMUM = 1 << 5;
+ private static final int DAEMON = 1 << 6;
private static AtomicInteger instanceId = new AtomicInteger(0);
- private volatile boolean isRunning = false;
- private volatile boolean isActive = false;
- private volatile boolean isBlocked = false;
- private volatile boolean shallPause = true;
- private volatile boolean shallStop = false;
+ private volatile int state;
+ private final static boolean isSet(final int state, final int mask) { return mask == ( state & mask ); }
+ private final boolean isSet(final int mask) { return mask == ( state & mask ); }
+ private final void set(final int mask) { state |= mask; }
+ private final void clear(final int mask) { state &= ~mask; }
+
private final Duration minPeriod;
private final Duration minDelay;
- private final boolean useMinimum;
private final Callback cbWork;
- private final Runnable cbInitLocked;
- private final Runnable cbEndLocked;
- private final boolean isDaemonThread;
+ private final StateCallback cbState;
private Thread thread;
private volatile Duration sleptDuration = Duration.ZERO;
+ private volatile Exception workErr = null;
/**
* Instantiates a new {@link WorkerThread}.
@@ -69,7 +113,7 @@ public class WorkerThread {
* @param work the actual work {@link Callback} to perform.
*/
public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work) {
- this(minPeriod, minDelay, daemonThread, work, null, null);
+ this(minPeriod, minDelay, daemonThread, work, null);
}
/**
@@ -78,98 +122,104 @@ public class WorkerThread {
* @param minDelay minimum work-loop-delay to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()}
* @param daemonThread argument for {@link Thread#setDaemon(boolean)}
* @param work the actual work {@link Callback} to perform.
- * @param init optional initialization {@link Runnable} called at {@link #start()} while locked
- * @param end optional release {@link Runnable} called at {@link #stop()} while locked
+ * @param stateChangeCB optional {@link StateCallback} called at different {@link StateCallback.State} changes while locked
*/
- public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final Runnable init, final Runnable end) {
+ public WorkerThread(final Duration minPeriod, final Duration minDelay, final boolean daemonThread, final Callback work, final StateCallback stateChangeCB) {
+ this.state = 0;
this.minPeriod = null != minPeriod ? minPeriod : Duration.ZERO;
this.minDelay = null != minDelay ? minDelay : Duration.ZERO;
- this.useMinimum = this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0;
+ if( this.minPeriod.toMillis() > 0 || this.minDelay.toMillis() > 0 ) {
+ set(USE_MINIMUM);
+ }
this.cbWork = work;
- this.cbInitLocked = init;
- this.cbEndLocked = end;
- this.isDaemonThread = daemonThread;
+ this.cbState = stateChangeCB;
+ if( daemonThread ) {
+ set(DAEMON);
+ }
thread = null;
}
/**
* Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed.
* <p>
- * Method blocks until the new worker thread has started, {@link #isRunning()} and also {@link #isActive()}
- * </p>
- */
- public final synchronized void start() {
- start(false);
- }
-
- /**
- * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed.
- * <p>
* Method blocks until the new worker thread has been started and {@link #isRunning()} and {@link #isActive()} if {@code paused == false}.
* </p>
* @param paused if {@code true}, keeps the new worker thread paused, otherwise {@link #resume()} it.
*/
public final synchronized void start(final boolean paused) {
- if( isRunning ) {
+ if( isSet(RUNNING) || null != thread || isSet(SHALL_STOP) || isSet(SHALL_PAUSE) ) {
+ // definite start condition: !isRunning
+ // subsequent conditions only for consistency/doc: null == thread && !shallStop && !shallPause
return;
}
- shallStop = false;
- shallPause = true;
+ if( paused ) {
+ set(SHALL_PAUSE);
+ }
thread = new Thread(threadRunnable);
- thread.setDaemon(isDaemonThread);
+ thread.setDaemon(isSet(DAEMON));
thread.start();
try {
this.notifyAll(); // wake-up startup-block
- while( !isRunning && !shallStop ) {
- this.wait(); // wait until started
+ if( !paused ) {
+ while( !isSet(RUNNING) && !isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) {
+ this.wait(); // wait until started and active (not-paused)
+ }
+ } else {
+ while( !isSet(RUNNING) && null != thread && !isSet(SHALL_STOP) ) {
+ this.wait(); // wait until started
+ }
+ while( isSet(RUNNING) && isSet(ACTIVE) && null != thread && !isSet(SHALL_STOP) ) {
+ this.wait(); // wait until paused
+ }
}
} catch (final InterruptedException e) {
throw new InterruptedRuntimeException(e);
}
- if( !paused ) {
- resume();
- }
}
/**
* Stops execution of the {@link #start()}'ed worker thread.
* <p>
- * Method blocks until worker thread has stopped.
+ * Method blocks until worker thread has been {@link #isRunning() stopped} if {@code waitUntilDone} is {@code true}.
* </p>
*/
- public final synchronized void stop() {
- if( isRunning ) {
- shallStop = true;
+ public final synchronized void stop(final boolean waitUntilDone) {
+ if( isSet(RUNNING) ) {
+ set(SHALL_STOP);
+ this.notifyAll(); // wake-up pause-block (opt)
if( java.lang.Thread.currentThread() != thread ) {
- if( isBlocked && isRunning ) {
+ if( isSet(BLOCKED | RUNNING) ) {
thread.interrupt();
}
- try {
- this.notifyAll(); // wake-up pause-block (opt)
- while( isRunning ) {
- this.wait(); // wait until stopped
+ if( waitUntilDone ) {
+ try {
+ while( isSet(RUNNING) ) {
+ this.wait(); // wait until stopped
+ }
+ } catch (final InterruptedException e) {
+ throw new InterruptedRuntimeException(e);
}
- } catch (final InterruptedException e) {
- throw new InterruptedRuntimeException(e);
}
}
- thread = null;
- shallStop = false;
- shallPause = true;
}
}
- /** Pauses execution of the {@link #start()}'ed worker thread. */
+ /**
+ * Pauses execution of the {@link #start()}'ed worker thread.
+ * <p>
+ * Method blocks until worker thread has been {@link #isActive()}'ated if {@code waitUntilDone} is {@code true}.
+ * </p>
+ */
public final synchronized void pause(final boolean waitUntilDone) {
- if( isActive ) {
- shallPause = true;
+ if( isSet(RUNNING | ACTIVE) && !isSet(SHALL_STOP) ) {
+ set(SHALL_PAUSE);
if( java.lang.Thread.currentThread() != thread ) {
- if( isBlocked && isActive ) {
+ if( isSet(BLOCKED | ACTIVE) ) {
thread.interrupt();
}
if( waitUntilDone ) {
try {
- while( isActive && isRunning ) {
+ while( isSet(RUNNING | ACTIVE) ) {
this.wait(); // wait until paused
}
} catch (final InterruptedException e) {
@@ -182,12 +232,12 @@ public class WorkerThread {
/** Resumes execution of the {@link #pause(boolean)}'ed worker thread. */
public final synchronized void resume() {
- if( isRunning && !isActive ) {
- shallPause = false;
+ if( isSet(RUNNING) && !isSet(ACTIVE) && !isSet(SHALL_STOP) ) {
+ clear(SHALL_PAUSE);
+ this.notifyAll(); // wake-up pause-block
if( java.lang.Thread.currentThread() != thread ) {
try {
- this.notifyAll(); // wake-up pause-block
- while( !isActive && !shallPause && isRunning ) {
+ while( !isSet(ACTIVE) && !isSet(SHALL_PAUSE) && isSet(RUNNING) ) {
this.wait(); // wait until resumed
}
} catch (final InterruptedException e) {
@@ -199,9 +249,21 @@ public class WorkerThread {
}
/** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()}. It might be {@link #pause(boolean) paused}. */
- public final boolean isRunning() { return isRunning; }
- /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()} and is not {@link #pause(boolean) paused}. */
- public final boolean isActive() { return isActive; }
+ public final boolean isRunning() { return isSet(RUNNING); }
+ /** Returns true if the worker thread {@link #isRunning()} and is not {@link #pause(boolean) paused}. */
+ public final boolean isActive() { return isSet(ACTIVE); }
+ /** Returns true if the worker thread {@link #isRunning()} and is {@link #pause(boolean) paused}. */
+ public final boolean isPaused() { return isSet(RUNNING) && !isSet(ACTIVE); }
+ /** Returns true if an exception occured during {@link Callable} work execution. */
+ public final boolean hasError() { return null != workErr; }
+ /** Returns the worker thread if {@link #isRunning()}, otherwise {@code null}. */
+ public final Thread getThread() { return thread; }
+
+ /**
+ * Returns the exception is {@link #hasError()}.
+ * @param clear if true, clear the exception
+ */
+ public final Exception getError(final boolean clear ) { final Exception e = workErr; if( clear) { workErr = null; } return e; }
/**
* Returns enforced minimum work-loop-period or {@link Duration#ZERO} for none.
@@ -228,10 +290,11 @@ public class WorkerThread {
@Override
public String toString() {
synchronized(this) {
- return "Worker[running "+isRunning+", active "+isActive+", blocked "+isBlocked+
- ", shall[pause "+shallPause+", stop "+shallStop+
- "], minDelay "+minDelay.toMillis()+"ms, minPeriod[set "+minPeriod.toMillis()+"ms, sleptDelta "+sleptDuration.toMillis()+
- "ms], daemon "+isDaemonThread+", thread "+thread+"]";
+ final int _state = state;
+ return "Worker[running "+isSet(_state, RUNNING)+", active "+isSet(_state, ACTIVE)+", blocked "+isSet(_state, BLOCKED)+
+ ", shall[pause "+isSet(_state, SHALL_PAUSE)+", stop "+isSet(_state, SHALL_STOP)+
+ "], min[period "+minPeriod.toMillis()+"ms, delay "+minDelay.toMillis()+"], slept "+sleptDuration.toMillis()+
+ "ms, daemon "+isSet(_state, DAEMON)+", thread "+thread+"]";
}
}
@@ -242,41 +305,90 @@ public class WorkerThread {
ct.setName(ct.getName()+"-Worker_"+instanceId.getAndIncrement());
synchronized ( WorkerThread.this ) {
- if( null != cbInitLocked ) {
- cbInitLocked.run();
+ Exception err = null;
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.INIT);
+ } catch (final InterruptedException e) {
+ // OK
+ } catch (final Throwable t) {
+ err = new Exception(t.getClass().getSimpleName()+" while processing init-state "+cbState, t);
+ }
+ if( null != err ) {
+ workErr = err;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
+ thread = null;
+ workErr.printStackTrace();
+ WorkerThread.this.notifyAll(); // wake-up ctor()
+ return; // bail out
+ }
}
- isRunning = true;
+ set(RUNNING | ACTIVE);
WorkerThread.this.notifyAll(); // wake-up ctor()
}
- while( !shallStop ) {
- Exception streamErr = null;
+ while( !isSet(SHALL_STOP) ) {
+ Exception err = null;
try {
- if( shallPause ) {
+ if( isSet(SHALL_PAUSE) ) {
synchronized ( WorkerThread.this ) {
- while( shallPause && !shallStop ) {
- isActive = false;
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.PAUSED);
+ } catch (final InterruptedException e) {
+ // OK
+ } catch (final Throwable t) {
+ err = new Exception(t.getClass().getSimpleName()+" while processing pause-state "+cbState, t);
+ }
+ if( null != err ) {
+ workErr = err;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
+ thread = null;
+ workErr.printStackTrace();
+ WorkerThread.this.notifyAll(); // wake-up ctor()
+ return; // bail out
+ }
+ }
+ while( isSet(SHALL_PAUSE) && !isSet(SHALL_STOP) ) {
+ clear(ACTIVE);
WorkerThread.this.notifyAll(); // wake-up doPause()
try {
WorkerThread.this.wait(); // wait until resumed
} catch (final InterruptedException e) {
- if( !shallPause ) {
+ if( !isSet(SHALL_PAUSE) ) {
throw new InterruptedRuntimeException(e);
}
}
}
- isActive = true;
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.RESUMED);
+ } catch (final InterruptedException e) {
+ err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing resume-state"+cbState, e);
+ } catch (final Throwable t) {
+ err = new Exception(t.getClass().getSimpleName()+" while processing resume-state "+cbState, t);
+ }
+ if( null != err ) {
+ workErr = err;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
+ thread = null;
+ workErr.printStackTrace();
+ WorkerThread.this.notifyAll(); // wake-up ctor()
+ return; // bail out
+ }
+ }
+ set(ACTIVE);
WorkerThread.this.notifyAll(); // wake-up doResume()
}
}
- if( !shallStop ) {
+ if( !isSet(SHALL_STOP) ) {
final Instant t0 = Instant.now();
- isBlocked = true;
+ set(BLOCKED);
{
- cbWork.run();
+ cbWork.run(WorkerThread.this);
}
- isBlocked = false;
- if( useMinimum ) {
+ clear(BLOCKED);
+ if( isSet(USE_MINIMUM) ) {
final long minDelayMS = minDelay.toMillis();
final Instant t1 = Instant.now();
final Duration td = Duration.between(t0, t1);
@@ -303,32 +415,40 @@ public class WorkerThread {
}
}
} catch (final InterruptedException e) {
- if( !isBlocked ) { // !shallStop && !shallPause
- streamErr = new InterruptedRuntimeException(e);
+ if( !isSet(BLOCKED) ) { // !shallStop && !shallPause
+ err = new InterruptedRuntimeException(e.getClass().getSimpleName()+" while processing work-callback "+cbWork, e);
}
- isBlocked = false;
+ clear(BLOCKED);
sleptDuration = Duration.ZERO;
} catch (final Throwable t) {
- streamErr = new Exception(t.getClass().getSimpleName()+" while processing", t);
+ err = new Exception(t.getClass().getSimpleName()+" while processing work-callback "+cbWork, t);
sleptDuration = Duration.ZERO;
} finally {
- if( null != streamErr ) {
+ if( null != err ) {
// state transition incl. notification
synchronized ( WorkerThread.this ) {
- shallPause = true;
- isActive = false;
+ workErr = err;
+ err = null;
+ set(SHALL_PAUSE);
+ clear(ACTIVE);
WorkerThread.this.notifyAll(); // wake-up potential do*()
}
- pause(false);
}
}
}
synchronized ( WorkerThread.this ) {
- if( null != cbEndLocked ) {
- cbEndLocked.run();
+ if( null != cbState ) {
+ try {
+ cbState.run(WorkerThread.this, StateCallback.State.END);
+ } catch (final InterruptedException e) {
+ // OK
+ } catch (final Throwable t) {
+ workErr = new Exception(t.getClass().getSimpleName()+" while processing end-state "+cbState, t);
+ workErr.printStackTrace();
+ }
}
- isRunning = false;
- isActive = false;
+ thread = null;
+ clear(RUNNING | ACTIVE | SHALL_STOP | SHALL_PAUSE);
WorkerThread.this.notifyAll(); // wake-up doStop()
}
} };