From fedfc22ba3a3583a3ecf4b55f7f8a822045b690c Mon Sep 17 00:00:00 2001 From: Sven Gothel Date: Wed, 20 Sep 2023 00:03:54 +0200 Subject: Add WorkerThread: A re-start'able, pause'able and interrupt'able worker thread with an optional minimum execution duration --- make/scripts/runtest.sh | 3 +- src/java/com/jogamp/common/util/WorkerThread.java | 315 +++++++++++++++++++++ .../com/jogamp/common/util/TestWorkerThread01.java | 175 ++++++++++++ 3 files changed, 492 insertions(+), 1 deletion(-) create mode 100644 src/java/com/jogamp/common/util/WorkerThread.java create mode 100644 src/junit/com/jogamp/common/util/TestWorkerThread01.java diff --git a/make/scripts/runtest.sh b/make/scripts/runtest.sh index a4c1d55..786c30a 100755 --- a/make/scripts/runtest.sh +++ b/make/scripts/runtest.sh @@ -119,6 +119,7 @@ function onetest() { #onetest com.jogamp.common.util.TestLongIntHashMap 2>&1 | tee -a $LOG #onetest com.jogamp.common.util.TestPlatform01 2>&1 | tee -a $LOG #onetest com.jogamp.common.util.TestRunnableTask01 2>&1 | tee -a $LOG +onetest com.jogamp.common.util.TestWorkerThread01 2>&1 | tee -a $LOG #onetest com.jogamp.common.util.TestIOUtil01 2>&1 | tee -a $LOG #onetest com.jogamp.common.util.TestTempJarCache 2>&1 | tee -a $LOG #onetest com.jogamp.common.util.TestJarUtil 2>&1 | tee -a $LOG @@ -150,7 +151,7 @@ function onetest() { #onetest com.jogamp.common.os.TestElfReader01 $* 2>&1 | tee -a $LOG #onetest com.jogamp.gluegen.test.junit.internals.TestType 2>&1 | tee -a $LOG -onetest com.jogamp.gluegen.test.junit.generation.TestJCPP $* 2>&1 | tee -a $LOG +#onetest com.jogamp.gluegen.test.junit.generation.TestJCPP $* 2>&1 | tee -a $LOG #onetest com.jogamp.gluegen.test.junit.generation.TestCParser $* 2>&1 | tee -a $LOG #onetest com.jogamp.gluegen.jcpp.CppReaderTest 2>&1 | tee -a $LOG #onetest com.jogamp.gluegen.jcpp.ErrorTest 2>&1 | tee -a $LOG diff --git a/src/java/com/jogamp/common/util/WorkerThread.java b/src/java/com/jogamp/common/util/WorkerThread.java new file mode 100644 index 0000000..996fd08 --- /dev/null +++ b/src/java/com/jogamp/common/util/WorkerThread.java @@ -0,0 +1,315 @@ +/** + * Copyright 2023 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ +package com.jogamp.common.util; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A re-{@link #start()}'able, {@link #pause(boolean)}'able and interrupt'able worker {@link Thread} + * with an optional minimum execution duration, see {@link #getSleptDuration()}. + */ +public class WorkerThread { + /** + * An interruptible {@link #run()} task. + */ + public interface Callback { + void run() throws InterruptedException; + } + + private static AtomicInteger instanceId = new AtomicInteger(0); + private volatile boolean isRunning = false; + private volatile boolean isActive = false; + private volatile boolean isBlocked = false; + + private volatile boolean shallPause = true; + private volatile boolean shallStop = false; + private Exception streamErr = null; + private final Duration minPeriod; + private final boolean useMinPeriod; + private final Callback cbWork; + private final Runnable cbInitLocked; + private final Runnable cbEndLocked; + private final boolean isDaemonThread; + private Thread thread; + private volatile Duration sleptDuration = Duration.ZERO; + + /** + * Instantiates a new {@link WorkerThread}. + * @param minPeriod minimum work-loop-period to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()} + * @param daemonThread argument for {@link Thread#setDaemon(boolean)} + * @param work the actual work {@link Callback} to perform. + */ + public WorkerThread(final Duration minPeriod, final boolean daemonThread, final Callback work) { + this(minPeriod, daemonThread, work, null, null); + } + + /** + * Instantiates a new {@link WorkerThread}. + * @param minPeriod minimum work-loop-period to throttle execution or {@code null} if unthrottled, see {@link #getSleptDuration()} + * @param daemonThread argument for {@link Thread#setDaemon(boolean)} + * @param work the actual work {@link Callback} to perform. + * @param init optional initialization {@link Runnable} called at {@link #start()} while locked + * @param end optional release {@link Runnable} called at {@link #stop()} while locked + */ + public WorkerThread(final Duration minPeriod, final boolean daemonThread, final Callback work, final Runnable init, final Runnable end) { + this.minPeriod = null != minPeriod ? minPeriod : Duration.ZERO; + this.useMinPeriod = this.minPeriod.toMillis() > 0; + this.cbWork = work; + this.cbInitLocked = init; + this.cbEndLocked = end; + this.isDaemonThread = daemonThread; + thread = null; + } + + /** + * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed. + *

+ * Method blocks until the new worker thread has started, {@link #isRunning()} and also {@link #isActive()} + *

+ */ + public final synchronized void start() { + start(false); + } + + /** + * Starts execution of a new worker thread if not {@link #isRunning}, i.e. never {@link #start()}'ed or {@link #stop()}'ed. + *

+ * Method blocks until the new worker thread has been started and {@link #isRunning()} and {@link #isActive()} if {@code paused == false}. + *

+ * @param paused if {@code true}, keeps the new worker thread paused, otherwise {@link #resume()} it. + */ + public final synchronized void start(final boolean paused) { + if( isRunning ) { + return; + } + shallStop = false; + shallPause = true; + thread = new Thread(threadRunnable); + thread.setDaemon(isDaemonThread); + thread.start(); + try { + this.notifyAll(); // wake-up startup-block + while( !isRunning && !shallStop ) { + this.wait(); // wait until started + } + } catch (final InterruptedException e) { + throw new InterruptedRuntimeException(e); + } + if( !paused ) { + resume(); + } + } + + /** + * Stops execution of the {@link #start()}'ed worker thread. + *

+ * Method blocks until worker thread has stopped. + *

+ */ + public final synchronized void stop() { + if( isRunning ) { + shallStop = true; + if( java.lang.Thread.currentThread() != thread ) { + if( isBlocked && isRunning ) { + thread.interrupt(); + } + try { + this.notifyAll(); // wake-up pause-block (opt) + while( isRunning ) { + this.wait(); // wait until stopped + } + } catch (final InterruptedException e) { + throw new InterruptedRuntimeException(e); + } + } + thread = null; + shallStop = false; + shallPause = true; + } + } + + /** Pauses execution of the {@link #start()}'ed worker thread. */ + public final synchronized void pause(final boolean waitUntilDone) { + if( isActive ) { + shallPause = true; + if( java.lang.Thread.currentThread() != thread ) { + if( isBlocked && isActive ) { + thread.interrupt(); + } + if( waitUntilDone ) { + try { + while( isActive && isRunning ) { + this.wait(); // wait until paused + } + } catch (final InterruptedException e) { + throw new InterruptedRuntimeException(e); + } + } + } + } + } + + /** Resumes execution of the {@link #pause(boolean)}'ed worker thread. */ + public final synchronized void resume() { + if( isRunning && !isActive ) { + shallPause = false; + if( java.lang.Thread.currentThread() != thread ) { + try { + this.notifyAll(); // wake-up pause-block + while( !isActive && !shallPause && isRunning ) { + this.wait(); // wait until resumed + } + } catch (final InterruptedException e) { + pause(false); + throw new InterruptedRuntimeException(e); + } + } + } + } + + /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()}. It might be {@link #pause(boolean) paused}. */ + public final boolean isRunning() { return isRunning; } + /** Returns true if the worker thread has started via {@link #start()} and has not ended, e.g. via {@link #stop()} and is not {@link #pause(boolean) paused}. */ + public final boolean isActive() { return isActive; } + + /** + * Returns enforced minimum work-loop-period or {@link Duration#ZERO} for none. + * @see #getSleptDuration() + */ + public final Duration getMinPeriod() { return minPeriod; } + + /** + * Returns the slept {@link Duration} delta of {@link #getMinPeriod()} and consumed {@link Callback#run()} duration. + *

+ * Returns {@link Duration#ZERO zero} for {@link Duration#ZERO zero} {@link #getMinPeriod()} or exceeding {@link Callback#run()} duration. + *

+ */ + public final Duration getSleptDuration() { return sleptDuration; } + + @Override + public String toString() { + synchronized(this) { + return "Worker[running "+isRunning+", active "+isActive+", blocked "+isBlocked+ + ", shall[pause "+shallPause+", stop "+shallStop+ + "], minPeriod[set "+minPeriod.toMillis()+"ms, sleptDelta "+sleptDuration.toMillis()+ + "ms], daemon "+isDaemonThread+", thread "+thread+"]"; + } + } + + private final Runnable threadRunnable = new Runnable() { + @Override + public final void run() { + final Thread ct = Thread.currentThread(); + ct.setName(ct.getName()+"-Worker_"+instanceId.getAndIncrement()); + + synchronized ( WorkerThread.this ) { + if( null != cbInitLocked ) { + cbInitLocked.run(); + } + isRunning = true; + WorkerThread.this.notifyAll(); // wake-up ctor() + } + + while( !shallStop ) { + try { + if( shallPause ) { + synchronized ( WorkerThread.this ) { + while( shallPause && !shallStop ) { + isActive = false; + WorkerThread.this.notifyAll(); // wake-up doPause() + try { + WorkerThread.this.wait(); // wait until resumed + } catch (final InterruptedException e) { + if( !shallPause ) { + throw new InterruptedRuntimeException(e); + } + } + } + isActive = true; + WorkerThread.this.notifyAll(); // wake-up doResume() + } + } + if( !shallStop ) { + final Instant t0 = Instant.now(); + isBlocked = true; + { + cbWork.run(); + } + isBlocked = false; + if( useMinPeriod ) { + final Instant t1 = Instant.now(); + final Duration td = Duration.between(t0, t1); + if( minPeriod.compareTo(td) > 0 ) { + final Duration sleepMinPeriodDelta = minPeriod.minus(td); + final long tdMinMS = sleepMinPeriodDelta.toMillis(); + if( tdMinMS > 0 ) { + sleptDuration = sleepMinPeriodDelta; + java.lang.Thread.sleep( tdMinMS ); + } else { + sleptDuration = Duration.ZERO; + } + // java.util.concurrent.locks.LockSupport.parkNanos(tdMin.toNanos()); + } else { + sleptDuration = Duration.ZERO; + } + } + } + } catch (final InterruptedException e) { + if( !isBlocked ) { // !shallStop && !shallPause + streamErr = new InterruptedRuntimeException(e); + } + isBlocked = false; + sleptDuration = Duration.ZERO; + } catch (final Throwable t) { + streamErr = new Exception(t.getClass().getSimpleName()+" while processing", t); + sleptDuration = Duration.ZERO; + } finally { + if( null != streamErr ) { + // state transition incl. notification + synchronized ( WorkerThread.this ) { + shallPause = true; + isActive = false; + WorkerThread.this.notifyAll(); // wake-up potential do*() + } + pause(false); + } + } + } + synchronized ( WorkerThread.this ) { + if( null != cbEndLocked ) { + cbEndLocked.run(); + } + isRunning = false; + isActive = false; + WorkerThread.this.notifyAll(); // wake-up doStop() + } + } }; + +} diff --git a/src/junit/com/jogamp/common/util/TestWorkerThread01.java b/src/junit/com/jogamp/common/util/TestWorkerThread01.java new file mode 100644 index 0000000..9ac7d75 --- /dev/null +++ b/src/junit/com/jogamp/common/util/TestWorkerThread01.java @@ -0,0 +1,175 @@ +/** + * Copyright 2023 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ + +package com.jogamp.common.util; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +import com.jogamp.junit.util.SingletonJunitCase; + +import org.junit.FixMethodOrder; +import org.junit.runners.MethodSorters; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestWorkerThread01 extends SingletonJunitCase { + + static class Action implements WorkerThread.Callback { + final Duration sleep; + AtomicInteger counter = new AtomicInteger(0); + Instant tlast = Instant.now(); + volatile Duration td = Duration.ZERO; + + Action(final Duration sleep) { + this.sleep = sleep; + } + + @Override + public void run() throws InterruptedException { + { + java.lang.Thread.sleep(sleep.toMillis()); + // java.util.concurrent.locks.LockSupport.parkNanos(sleep.toNanos()); + } + final Instant t1 = Instant.now(); + td = Duration.between(tlast, t1); + System.err.println("action period "+td.toMillis()+"ms, counter "+counter.getAndIncrement()); + tlast = t1; + } + } + + static void checkStarted(final WorkerThread wt, final boolean isPaused) { + Assert.assertTrue(wt.isRunning()); + Assert.assertEquals(!isPaused, wt.isActive()); + } + static void checkStopped(final WorkerThread wt) { + Assert.assertFalse(wt.isRunning()); + Assert.assertFalse(wt.isActive()); + } + static void start(final WorkerThread wt) { + System.err.println("WT Start.0: "+wt); + wt.start(); + System.err.println("WT Start.X: "+wt); + } + static void stop(final WorkerThread wt) { + System.err.println("WT Stop.0: "+wt); + wt.stop(); + System.err.println("WT Stop.X: "+wt); + } + static void pause(final WorkerThread wt, final boolean wait) { + System.err.println("WT Pause.0: wait "+wait+", "+wt); + wt.pause(wait); + System.err.println("WT Pause.X: wait "+wait+", "+wt); + } + static void resume(final WorkerThread wt) { + System.err.println("WT Resume.0: "+wt); + wt.resume(); + System.err.println("WT Resume.X: "+wt); + } + + public void testAction(final long periodMS, final long actionMS) throws IOException, InterruptedException, InvocationTargetException { + final Action action = new Action( 0 < actionMS ? Duration.of(actionMS, ChronoUnit.MILLIS) : Duration.ZERO); + final WorkerThread wt =new WorkerThread(Duration.of(periodMS, ChronoUnit.MILLIS), true /* daemonThread */, action); + final long maxPeriodMS = Math.max(periodMS, actionMS); + int counterA = action.counter.get(); + checkStopped(wt); + start(wt); + checkStarted(wt, false /* isPaused */); + Thread.sleep(maxPeriodMS*3); + { + final Duration td = action.td; + final Duration wt_slept = wt.getSleptDuration(); + final long actionMS_d = td.minus( wt_slept ).toMillis() - actionMS; + System.err.println("actionMS_d "+actionMS_d+" = td "+td.toMillis()+"ms - wt_slept "+wt_slept.toMillis()+"ms - actionMS "+actionMS+"ms"); + Assert.assertTrue(Math.abs(actionMS_d) < 4); + } + + checkStarted(wt, false /* isPaused */); + stop(wt); + checkStopped(wt); + int counterB = action.counter.get(); + Assert.assertTrue(counterB > counterA); + + counterA = action.counter.get(); + checkStopped(wt); + start(wt); + checkStarted(wt, false /* isPaused */); + Thread.sleep(maxPeriodMS*3); + + checkStarted(wt, false /* isPaused */); + pause(wt, true /* wait */); + checkStarted(wt, true /* isPaused */); + counterB = action.counter.get(); + Assert.assertTrue(counterB > counterA); + + counterA = action.counter.get(); + Assert.assertTrue(counterB == counterA); + Thread.sleep(maxPeriodMS); + resume(wt); + checkStarted(wt, false /* isPaused */); + Thread.sleep(maxPeriodMS*3); + + checkStarted(wt, false /* isPaused */); + stop(wt); + checkStopped(wt); + counterB = action.counter.get(); + Assert.assertTrue(counterB > counterA); + } + + @Test + public void test01ZeroAction() throws IOException, InterruptedException, InvocationTargetException { + testAction(16 /* periodMS */, 0 /* actionMS*/); + } + + @Test + public void test02MidAction() throws IOException, InterruptedException, InvocationTargetException { + testAction(16 /* periodMS */, 8 /* actionMS*/); + } + + @Test + public void test03HeavyAction() throws IOException, InterruptedException, InvocationTargetException { + testAction(16 /* periodMS */, 20 /* actionMS*/); + } + + @Test + public void test03ZeroMidAction() throws IOException, InterruptedException, InvocationTargetException { + testAction(0 /* periodMS */, 8 /* actionMS*/); + } + + public static void main(final String args[]) throws IOException { + final String tstname = TestWorkerThread01.class.getName(); + org.junit.runner.JUnitCore.main(tstname); + } + +} -- cgit v1.2.3