diff options
Diffstat (limited to 'LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp')
-rw-r--r-- | LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp | 401 |
1 files changed, 0 insertions, 401 deletions
diff --git a/LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp b/LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp deleted file mode 100644 index 90ba3cc..0000000 --- a/LibOVR/Src/Kernel/OVR_ThreadCommandQueue.cpp +++ /dev/null @@ -1,401 +0,0 @@ -/************************************************************************************ - -PublicHeader: None -Filename : OVR_ThreadCommandQueue.cpp -Content : Command queue for operations executed on a thread -Created : October 29, 2012 - -Copyright : Copyright 2014 Oculus VR, LLC All Rights reserved. - -Licensed under the Oculus VR Rift SDK License Version 3.2 (the "License"); -you may not use the Oculus VR Rift SDK except in compliance with the License, -which is provided at the time of installation or download, or which -otherwise accompanies this software in either electronic or hard copy form. - -You may obtain a copy of the License at - -http://www.oculusvr.com/licenses/LICENSE-3.2 - -Unless required by applicable law or agreed to in writing, the Oculus VR SDK -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -************************************************************************************/ - -#include "OVR_ThreadCommandQueue.h" - -namespace OVR { - - -//------------------------------------------------------------------------ -// ***** CircularBuffer - -// CircularBuffer is a FIFO buffer implemented in a single block of memory, -// which allows writing and reading variable-size data chucks. Write fails -// if buffer is full. - -class CircularBuffer -{ - enum { - AlignSize = 16, - AlignMask = AlignSize - 1 - }; - - uint8_t* pBuffer; - size_t Size; - size_t Tail; // Byte offset of next item to be popped. - size_t Head; // Byte offset of where next push will take place. - size_t End; // When Head < Tail, this is used instead of Size. - - inline size_t roundUpSize(size_t size) - { return (size + AlignMask) & ~(size_t)AlignMask; } - -public: - - CircularBuffer(size_t size) - : Size(size), Tail(0), Head(0), End(0) - { - pBuffer = (uint8_t*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize); - } - ~CircularBuffer() - { - // For ThreadCommands, we must consume everything before shutdown. - OVR_ASSERT(IsEmpty()); - OVR_FREE_ALIGNED(pBuffer); - } - - bool IsEmpty() const { return (Head == Tail); } - - // Allocates a state block of specified size and advances pointers, - // returning 0 if buffer is full. - uint8_t* Write(size_t size); - - // Returns a pointer to next available data block; 0 if none available. - uint8_t* ReadBegin() - { return (Head != Tail) ? (pBuffer + Tail) : 0; } - // Consumes data of specified size; this must match size passed to Write. - void ReadEnd(size_t size); -}; - - -// Allocates a state block of specified size and advances pointers, -// returning 0 if buffer is full. -uint8_t* CircularBuffer::Write(size_t size) -{ - uint8_t* p = 0; - - size = roundUpSize(size); - // Since this is circular buffer, always allow at least one item. - OVR_ASSERT(size < Size/2); - - if (Head >= Tail) - { - OVR_ASSERT(End == 0); - - if (size <= (Size - Head)) - { - p = pBuffer + Head; - Head += size; - } - else if (size < Tail) - { - p = pBuffer; - End = Head; - Head = size; - OVR_ASSERT(Head != Tail); - } - } - else - { - OVR_ASSERT(End != 0); - - if ((Tail - Head) > size) - { - p = pBuffer + Head; - Head += size; - OVR_ASSERT(Head != Tail); - } - } - - return p; -} - -void CircularBuffer::ReadEnd(size_t size) -{ - OVR_ASSERT(Head != Tail); - size = roundUpSize(size); - - Tail += size; - if (Tail == End) - { - Tail = End = 0; - } - else if (Tail == Head) - { - OVR_ASSERT(End == 0); - Tail = Head = 0; - } -} - - -//------------------------------------------------------------------------------------- -// ***** ThreadCommand - -ThreadCommand::PopBuffer::~PopBuffer() -{ - if (Size) { - Destruct<ThreadCommand>(toCommand()); - } -} - -void ThreadCommand::PopBuffer::InitFromBuffer(void* data) -{ - ThreadCommand* cmd = (ThreadCommand*)data; - OVR_ASSERT(cmd->Size <= MaxSize); - - if (Size) { - Destruct<ThreadCommand>(toCommand()); - } - Size = cmd->Size; - memcpy(Buffer, (void*)cmd, Size); -} - -void ThreadCommand::PopBuffer::Execute() -{ - ThreadCommand* command = toCommand(); - OVR_ASSERT(command); - command->Execute(); - if (NeedsWait()) { - GetEvent()->PulseEvent(); - } -} - -//------------------------------------------------------------------------------------- - -class ThreadCommandQueueImpl : public NewOverrideBase -{ - typedef ThreadCommand::NotifyEvent NotifyEvent; - friend class ThreadCommandQueue; - -public: - - ThreadCommandQueueImpl(ThreadCommandQueue* queue) : - pQueue(queue), - ExitEnqueued(false), - ExitProcessed(false), - CommandBuffer(2048), - PullThreadId(0) - { - } - ~ThreadCommandQueueImpl(); - - - bool PushCommand(const ThreadCommand& command); - bool PopCommand(ThreadCommand::PopBuffer* popBuffer); - - - // ExitCommand is used by notify us that Thread is shutting down. - struct ExitCommand : public ThreadCommand - { - ThreadCommandQueueImpl* pImpl; - - ExitCommand(ThreadCommandQueueImpl* impl, bool wait) - : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { } - - virtual void Execute() const - { - Lock::Locker lock(&pImpl->QueueLock); - pImpl->ExitProcessed = true; - } - virtual ThreadCommand* CopyConstruct(void* p) const - { return Construct<ExitCommand>(p, *this); } - }; - - - NotifyEvent* AllocNotifyEvent_NTS() - { - NotifyEvent* p = AvailableEvents.GetFirst(); - - if (!AvailableEvents.IsNull(p)) - p->RemoveNode(); - else - p = new NotifyEvent; - return p; - } - - void FreeNotifyEvent_NTS(NotifyEvent* p) - { - AvailableEvents.PushBack(p); - } - - void FreeNotifyEvents_NTS() - { - while(!AvailableEvents.IsEmpty()) - { - NotifyEvent* p = AvailableEvents.GetFirst(); - p->RemoveNode(); - delete p; - } - } - - ThreadCommandQueue* pQueue; - Lock QueueLock; - volatile bool ExitEnqueued; - volatile bool ExitProcessed; - List<NotifyEvent> AvailableEvents; - List<NotifyEvent> BlockedProducers; - CircularBuffer CommandBuffer; - - // The pull thread id is set to the last thread that pulled commands. - // Since this thread command queue is designed for a single thread, - // reentrant behavior that would cause a dead-lock for messages that - // wait for completion can be avoided by simply comparing the - // thread id of the last pull. - OVR::ThreadId PullThreadId; -}; - -ThreadCommandQueueImpl::~ThreadCommandQueueImpl() -{ - Lock::Locker lock(&QueueLock); - OVR_ASSERT(BlockedProducers.IsEmpty()); - FreeNotifyEvents_NTS(); -} - -bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command) -{ - if (command.NeedsWait() && PullThreadId == OVR::GetCurrentThreadId()) - { - command.Execute(); - return true; - } - - ThreadCommand::NotifyEvent* completeEvent = 0; - ThreadCommand::NotifyEvent* queueAvailableEvent = 0; - - // Repeat writing command into buffer until it is available. - for (;;) { - { // Lock Scope - Lock::Locker lock(&QueueLock); - - if (queueAvailableEvent) { - FreeNotifyEvent_NTS(queueAvailableEvent); - queueAvailableEvent = 0; - } - - // Don't allow any commands after PushExitCommand() is called. - if (ExitEnqueued && !command.ExitFlag) { - return false; - } - - bool bufferWasEmpty = CommandBuffer.IsEmpty(); - uint8_t* buffer = CommandBuffer.Write(command.GetSize()); - - if (buffer) { - ThreadCommand* c = command.CopyConstruct(buffer); - - if (c->NeedsWait()) { - completeEvent = c->pEvent = AllocNotifyEvent_NTS(); - } - - // Signal-waker consumer when we add data to buffer. - if (bufferWasEmpty) { - pQueue->OnPushNonEmpty_Locked(); - } - - break; - } - - queueAvailableEvent = AllocNotifyEvent_NTS(); - BlockedProducers.PushBack(queueAvailableEvent); - } // Lock Scope - - queueAvailableEvent->Wait(); - } // Intentional infinite loop - - // Command was enqueued, wait if necessary. - if (completeEvent) { - completeEvent->Wait(); - Lock::Locker lock(&QueueLock); - FreeNotifyEvent_NTS(completeEvent); - } - - return true; -} - - -// Pops the next command from the thread queue, if any is available. -bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer) -{ - PullThreadId = OVR::GetCurrentThreadId(); - - Lock::Locker lock(&QueueLock); - - uint8_t* buffer = CommandBuffer.ReadBegin(); - if (!buffer) - { - // Notify thread while in lock scope, enabling initialization of wait. - pQueue->OnPopEmpty_Locked(); - return false; - } - - popBuffer->InitFromBuffer(buffer); - CommandBuffer.ReadEnd(popBuffer->GetSize()); - - if (!BlockedProducers.IsEmpty()) - { - ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst(); - queueAvailableEvent->RemoveNode(); - queueAvailableEvent->PulseEvent(); - // Event is freed later by waiter. - } - return true; -} - - -//------------------------------------------------------------------------------------- - -ThreadCommandQueue::ThreadCommandQueue() -{ - pImpl = new ThreadCommandQueueImpl(this); -} -ThreadCommandQueue::~ThreadCommandQueue() -{ - delete pImpl; -} - -bool ThreadCommandQueue::PushCommand(const ThreadCommand& command) -{ - return pImpl->PushCommand(command); -} - -bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer) -{ - return pImpl->PopCommand(popBuffer); -} - -void ThreadCommandQueue::PushExitCommand(bool wait) -{ - // Exit is processed in two stages: - // - First, ExitEnqueued flag is set to block further commands from queuing up. - // - Second, the actual exit call is processed on the consumer thread, flushing - // any prior commands. - // IsExiting() only returns true after exit has flushed. - { - Lock::Locker lock(&pImpl->QueueLock); - if (pImpl->ExitEnqueued) - return; - pImpl->ExitEnqueued = true; - } - - PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait)); -} - -bool ThreadCommandQueue::IsExiting() const -{ - return pImpl->ExitProcessed; -} - - -} // namespace OVR |