From 7f526780994f44793fe56a426c01fcebfb64a75a Mon Sep 17 00:00:00 2001 From: Chris Robinson Date: Sun, 14 Apr 2019 01:03:36 -0700 Subject: Use a custom message queue for the WASAPI backend --- Alc/backends/wasapi.cpp | 423 ++++++++++++++++++------------------------------ 1 file changed, 160 insertions(+), 263 deletions(-) (limited to 'Alc/backends/wasapi.cpp') diff --git a/Alc/backends/wasapi.cpp b/Alc/backends/wasapi.cpp index 8f7c9f47..9dfbd247 100644 --- a/Alc/backends/wasapi.cpp +++ b/Alc/backends/wasapi.cpp @@ -40,12 +40,16 @@ #include #endif +#include +#include #include #include #include #include +#include #include #include +#include #include "alMain.h" #include "alu.h" @@ -136,48 +140,6 @@ al::vector PlaybackDevices; al::vector CaptureDevices; -HANDLE ThreadHdl; -DWORD ThreadID; - -struct ThreadRequest { - HANDLE FinishedEvt; - HRESULT result; -}; - - -#define WM_USER_First (WM_USER+0) -#define WM_USER_OpenDevice (WM_USER+0) -#define WM_USER_ResetDevice (WM_USER+1) -#define WM_USER_StartDevice (WM_USER+2) -#define WM_USER_StopDevice (WM_USER+3) -#define WM_USER_CloseDevice (WM_USER+4) -#define WM_USER_Enumerate (WM_USER+5) -#define WM_USER_Last (WM_USER+5) - -constexpr char MessageStr[WM_USER_Last+1-WM_USER][20] = { - "Open Device", - "Reset Device", - "Start Device", - "Stop Device", - "Close Device", - "Enumerate Devices", -}; - -inline void ReturnMsgResponse(ThreadRequest *req, HRESULT res) -{ - req->result = res; - SetEvent(req->FinishedEvt); -} - -HRESULT WaitForResponse(ThreadRequest *req) -{ - if(WaitForSingleObject(req->FinishedEvt, INFINITE) == WAIT_OBJECT_0) - return req->result; - ERR("Message response error: %lu\n", GetLastError()); - return E_FAIL; -} - - using NameGUIDPair = std::pair; NameGUIDPair get_device_name_and_guid(IMMDevice *device) { @@ -338,6 +300,31 @@ HRESULT probe_devices(IMMDeviceEnumerator *devenum, EDataFlow flowdir, al::vecto } +enum class MsgType : unsigned int { + OpenDevice, + ResetDevice, + StartDevice, + StopDevice, + CloseDevice, + EnumeratePlayback, + EnumerateCapture, + QuitThread, + + Count +}; + +constexpr char MessageStr[static_cast(MsgType::Count)][20]{ + "Open Device", + "Reset Device", + "Start Device", + "Stop Device", + "Close Device", + "Enumerate Playback", + "Enumerate Capture", + "Quit" +}; + + /* Proxy interface used by the message handler. */ struct WasapiProxy { virtual HRESULT openProxy() = 0; @@ -347,132 +334,153 @@ struct WasapiProxy { virtual HRESULT startProxy() = 0; virtual void stopProxy() = 0; - static DWORD CALLBACK messageHandler(void *ptr); + struct Msg { + MsgType mType; + WasapiProxy *mProxy; + std::promise mPromise; + }; + static std::deque mMsgQueue; + static std::mutex mMsgQueueLock; + static std::condition_variable mMsgQueueCond; + + std::future pushMessage(MsgType type) + { + std::promise promise; + std::future future{promise.get_future()}; + { std::lock_guard _{mMsgQueueLock}; + mMsgQueue.emplace_back(Msg{type, this, std::move(promise)}); + } + mMsgQueueCond.notify_one(); + return future; + } + + static std::future pushMessageStatic(MsgType type) + { + std::promise promise; + std::future future{promise.get_future()}; + { std::lock_guard _{mMsgQueueLock}; + mMsgQueue.emplace_back(Msg{type, nullptr, std::move(promise)}); + } + mMsgQueueCond.notify_one(); + return future; + } + + static bool popMessage(Msg &msg) + { + std::unique_lock lock{mMsgQueueLock}; + while(mMsgQueue.empty()) + mMsgQueueCond.wait(lock); + msg = std::move(mMsgQueue.front()); + mMsgQueue.pop_front(); + return msg.mType != MsgType::QuitThread; + } + + static int messageHandler(std::promise *promise); static constexpr inline const char *CurrentPrefix() noexcept { return "WasapiProxy::"; } }; +std::deque WasapiProxy::mMsgQueue; +std::mutex WasapiProxy::mMsgQueueLock; +std::condition_variable WasapiProxy::mMsgQueueCond; -DWORD WasapiProxy::messageHandler(void *ptr) +int WasapiProxy::messageHandler(std::promise *promise) { - auto req = static_cast(ptr); - TRACE("Starting message thread\n"); HRESULT cohr = CoInitializeEx(nullptr, COINIT_MULTITHREADED); if(FAILED(cohr)) { WARN("Failed to initialize COM: 0x%08lx\n", cohr); - ReturnMsgResponse(req, cohr); + promise->set_value(cohr); return 0; } + void *ptr{}; HRESULT hr{CoCreateInstance(CLSID_MMDeviceEnumerator, nullptr, CLSCTX_INPROC_SERVER, IID_IMMDeviceEnumerator, &ptr)}; if(FAILED(hr)) { WARN("Failed to create IMMDeviceEnumerator instance: 0x%08lx\n", hr); + promise->set_value(hr); CoUninitialize(); - ReturnMsgResponse(req, hr); return 0; } auto Enumerator = static_cast(ptr); Enumerator->Release(); Enumerator = nullptr; - CoUninitialize(); - /* HACK: Force Windows to create a message queue for this thread before - * returning success, otherwise PostThreadMessage may fail if it gets - * called before GetMessage. - */ - MSG msg; - PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE); - TRACE("Message thread initialization complete\n"); - ReturnMsgResponse(req, S_OK); + promise->set_value(S_OK); + promise = nullptr; TRACE("Starting message loop\n"); ALuint deviceCount{0}; - while(GetMessage(&msg, nullptr, WM_USER_First, WM_USER_Last)) + Msg msg; + while(popMessage(msg)) { - TRACE("Got message \"%s\" (0x%04x, lparam=%p, wparam=%p)\n", - (msg.message >= WM_USER && msg.message <= WM_USER_Last) ? - MessageStr[msg.message-WM_USER] : "Unknown", - msg.message, (void*)msg.lParam, (void*)msg.wParam - ); + TRACE("Got message \"%s\" (0x%04x, this=%p)\n", + MessageStr[static_cast(msg.mType)], static_cast(msg.mType), + msg.mProxy); - WasapiProxy *proxy{nullptr}; - switch(msg.message) + switch(msg.mType) { - case WM_USER_OpenDevice: - req = reinterpret_cast(msg.wParam); - proxy = reinterpret_cast(msg.lParam); - + case MsgType::OpenDevice: hr = cohr = S_OK; if(++deviceCount == 1) hr = cohr = CoInitializeEx(nullptr, COINIT_MULTITHREADED); if(SUCCEEDED(hr)) - hr = proxy->openProxy(); + hr = msg.mProxy->openProxy(); + msg.mPromise.set_value(hr); + if(FAILED(hr)) { if(--deviceCount == 0 && SUCCEEDED(cohr)) CoUninitialize(); } - - ReturnMsgResponse(req, hr); continue; - case WM_USER_ResetDevice: - req = reinterpret_cast(msg.wParam); - proxy = reinterpret_cast(msg.lParam); - - hr = proxy->resetProxy(); - ReturnMsgResponse(req, hr); + case MsgType::ResetDevice: + hr = msg.mProxy->resetProxy(); + msg.mPromise.set_value(hr); continue; - case WM_USER_StartDevice: - req = reinterpret_cast(msg.wParam); - proxy = reinterpret_cast(msg.lParam); - - hr = proxy->startProxy(); - ReturnMsgResponse(req, hr); + case MsgType::StartDevice: + hr = msg.mProxy->startProxy(); + msg.mPromise.set_value(hr); continue; - case WM_USER_StopDevice: - req = reinterpret_cast(msg.wParam); - proxy = reinterpret_cast(msg.lParam); - - proxy->stopProxy(); - ReturnMsgResponse(req, S_OK); + case MsgType::StopDevice: + msg.mProxy->stopProxy(); + msg.mPromise.set_value(S_OK); continue; - case WM_USER_CloseDevice: - req = reinterpret_cast(msg.wParam); - proxy = reinterpret_cast(msg.lParam); + case MsgType::CloseDevice: + msg.mProxy->closeProxy(); + msg.mPromise.set_value(S_OK); - proxy->closeProxy(); if(--deviceCount == 0) CoUninitialize(); - - ReturnMsgResponse(req, S_OK); continue; - case WM_USER_Enumerate: - req = reinterpret_cast(msg.wParam); - + case MsgType::EnumeratePlayback: + case MsgType::EnumerateCapture: hr = cohr = S_OK; if(++deviceCount == 1) hr = cohr = CoInitializeEx(nullptr, COINIT_MULTITHREADED); if(SUCCEEDED(hr)) hr = CoCreateInstance(CLSID_MMDeviceEnumerator, nullptr, CLSCTX_INPROC_SERVER, IID_IMMDeviceEnumerator, &ptr); - if(SUCCEEDED(hr)) + if(FAILED(hr)) + msg.mPromise.set_value(hr); + else { Enumerator = static_cast(ptr); - if(msg.lParam == static_cast(DevProbe::Playback)) + if(msg.mType == MsgType::EnumeratePlayback) hr = probe_devices(Enumerator, eRender, PlaybackDevices); - else if(msg.lParam == static_cast(DevProbe::Capture)) + else if(msg.mType == MsgType::EnumerateCapture) hr = probe_devices(Enumerator, eCapture, CaptureDevices); + msg.mPromise.set_value(hr); Enumerator->Release(); Enumerator = nullptr; @@ -480,12 +488,11 @@ DWORD WasapiProxy::messageHandler(void *ptr) if(--deviceCount == 0 && SUCCEEDED(cohr)) CoUninitialize(); - - ReturnMsgResponse(req, hr); continue; default: - ERR("Unexpected message: %u\n", msg.message); + ERR("Unexpected message: %u\n", static_cast(msg.mType)); + msg.mPromise.set_value(E_FAIL); continue; } } @@ -521,8 +528,6 @@ struct WasapiPlayback final : public BackendBase, WasapiProxy { IAudioRenderClient *mRender{nullptr}; HANDLE mNotifyEvent{nullptr}; - HANDLE mMsgEvent{nullptr}; - std::atomic mPadding{0u}; std::atomic mKillNow{true}; @@ -534,23 +539,11 @@ struct WasapiPlayback final : public BackendBase, WasapiProxy { WasapiPlayback::~WasapiPlayback() { - if(mMsgEvent) - { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - if(PostThreadMessage(ThreadID, WM_USER_CloseDevice, (WPARAM)&req, (LPARAM)proxy)) - (void)WaitForResponse(&req); - - CloseHandle(mMsgEvent); - mMsgEvent = nullptr; - } + pushMessage(MsgType::CloseDevice).wait(); if(mNotifyEvent != nullptr) CloseHandle(mNotifyEvent); mNotifyEvent = nullptr; - if(mMsgEvent != nullptr) - CloseHandle(mMsgEvent); - mMsgEvent = nullptr; } @@ -659,10 +652,9 @@ ALCenum WasapiPlayback::open(const ALCchar *name) HRESULT hr{S_OK}; mNotifyEvent = CreateEventW(nullptr, FALSE, FALSE, nullptr); - mMsgEvent = CreateEventW(nullptr, FALSE, FALSE, nullptr); - if(mNotifyEvent == nullptr || mMsgEvent == nullptr) + if(mNotifyEvent == nullptr) { - ERR("Failed to create message events: %lu\n", GetLastError()); + ERR("Failed to create notify events: %lu\n", GetLastError()); hr = E_FAIL; } @@ -671,11 +663,7 @@ ALCenum WasapiPlayback::open(const ALCchar *name) if(name) { if(PlaybackDevices.empty()) - { - ThreadRequest req = { mMsgEvent, 0 }; - if(PostThreadMessage(ThreadID, WM_USER_Enumerate, (WPARAM)&req, static_cast(DevProbe::Playback))) - (void)WaitForResponse(&req); - } + pushMessage(MsgType::EnumeratePlayback).wait(); hr = E_FAIL; auto iter = std::find_if(PlaybackDevices.cbegin(), PlaybackDevices.cend(), @@ -702,25 +690,13 @@ ALCenum WasapiPlayback::open(const ALCchar *name) } if(SUCCEEDED(hr)) - { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - - hr = E_FAIL; - if(PostThreadMessage(ThreadID, WM_USER_OpenDevice, (WPARAM)&req, (LPARAM)proxy)) - hr = WaitForResponse(&req); - else - ERR("Failed to post thread message: %lu\n", GetLastError()); - } + hr = pushMessage(MsgType::OpenDevice).get(); if(FAILED(hr)) { if(mNotifyEvent != nullptr) CloseHandle(mNotifyEvent); mNotifyEvent = nullptr; - if(mMsgEvent != nullptr) - CloseHandle(mMsgEvent); - mMsgEvent = nullptr; mDevId.clear(); @@ -777,12 +753,7 @@ void WasapiPlayback::closeProxy() ALCboolean WasapiPlayback::reset() { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - HRESULT hr{E_FAIL}; - if(PostThreadMessage(ThreadID, WM_USER_ResetDevice, (WPARAM)&req, (LPARAM)proxy)) - hr = WaitForResponse(&req); - + HRESULT hr{pushMessage(MsgType::ResetDevice).get()}; return SUCCEEDED(hr) ? ALC_TRUE : ALC_FALSE; } @@ -1044,12 +1015,7 @@ HRESULT WasapiPlayback::resetProxy() ALCboolean WasapiPlayback::start() { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - HRESULT hr{E_FAIL}; - if(PostThreadMessage(ThreadID, WM_USER_StartDevice, (WPARAM)&req, (LPARAM)proxy)) - hr = WaitForResponse(&req); - + HRESULT hr{pushMessage(MsgType::StartDevice).get()}; return SUCCEEDED(hr) ? ALC_TRUE : ALC_FALSE; } @@ -1089,12 +1055,7 @@ HRESULT WasapiPlayback::startProxy() void WasapiPlayback::stop() -{ - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - if(PostThreadMessage(ThreadID, WM_USER_StopDevice, (WPARAM)&req, (LPARAM)proxy)) - (void)WaitForResponse(&req); -} +{ pushMessage(MsgType::StopDevice).wait(); } void WasapiPlayback::stopProxy() { @@ -1150,8 +1111,6 @@ struct WasapiCapture final : public BackendBase, WasapiProxy { IAudioCaptureClient *mCapture{nullptr}; HANDLE mNotifyEvent{nullptr}; - HANDLE mMsgEvent{nullptr}; - ChannelConverterPtr mChannelConv; SampleConverterPtr mSampleConv; RingBufferPtr mRing; @@ -1165,16 +1124,7 @@ struct WasapiCapture final : public BackendBase, WasapiProxy { WasapiCapture::~WasapiCapture() { - if(mMsgEvent) - { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - if(PostThreadMessage(ThreadID, WM_USER_CloseDevice, (WPARAM)&req, (LPARAM)proxy)) - (void)WaitForResponse(&req); - - CloseHandle(mMsgEvent); - mMsgEvent = nullptr; - } + pushMessage(MsgType::CloseDevice).wait(); if(mNotifyEvent != nullptr) CloseHandle(mNotifyEvent); @@ -1279,10 +1229,9 @@ ALCenum WasapiCapture::open(const ALCchar *name) HRESULT hr{S_OK}; mNotifyEvent = CreateEventW(nullptr, FALSE, FALSE, nullptr); - mMsgEvent = CreateEventW(nullptr, FALSE, FALSE, nullptr); - if(mNotifyEvent == nullptr || mMsgEvent == nullptr) + if(mNotifyEvent == nullptr) { - ERR("Failed to create message events: %lu\n", GetLastError()); + ERR("Failed to create notify event: %lu\n", GetLastError()); hr = E_FAIL; } @@ -1291,11 +1240,7 @@ ALCenum WasapiCapture::open(const ALCchar *name) if(name) { if(CaptureDevices.empty()) - { - ThreadRequest req{ mMsgEvent, 0 }; - if(PostThreadMessage(ThreadID, WM_USER_Enumerate, (WPARAM)&req, static_cast(DevProbe::Capture))) - (void)WaitForResponse(&req); - } + pushMessage(MsgType::EnumerateCapture).wait(); hr = E_FAIL; auto iter = std::find_if(CaptureDevices.cbegin(), CaptureDevices.cend(), @@ -1322,46 +1267,26 @@ ALCenum WasapiCapture::open(const ALCchar *name) } if(SUCCEEDED(hr)) - { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - hr = E_FAIL; - if(PostThreadMessage(ThreadID, WM_USER_OpenDevice, (WPARAM)&req, (LPARAM)proxy)) - hr = WaitForResponse(&req); - else - ERR("Failed to post thread message: %lu\n", GetLastError()); - } + hr = pushMessage(MsgType::OpenDevice).get(); if(FAILED(hr)) { if(mNotifyEvent != nullptr) CloseHandle(mNotifyEvent); mNotifyEvent = nullptr; - if(mMsgEvent != nullptr) - CloseHandle(mMsgEvent); - mMsgEvent = nullptr; mDevId.clear(); ERR("Device init failed: 0x%08lx\n", hr); return ALC_INVALID_VALUE; } - else - { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - hr = E_FAIL; - if(PostThreadMessage(ThreadID, WM_USER_ResetDevice, (WPARAM)&req, (LPARAM)proxy)) - hr = WaitForResponse(&req); - else - ERR("Failed to post thread message: %lu\n", GetLastError()); - if(FAILED(hr)) - { - if(hr == E_OUTOFMEMORY) - return ALC_OUT_OF_MEMORY; - return ALC_INVALID_VALUE; - } + hr = pushMessage(MsgType::ResetDevice).get(); + if(FAILED(hr)) + { + if(hr == E_OUTOFMEMORY) + return ALC_OUT_OF_MEMORY; + return ALC_INVALID_VALUE; } return ALC_NO_ERROR; @@ -1645,12 +1570,7 @@ HRESULT WasapiCapture::resetProxy() ALCboolean WasapiCapture::start() { - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - HRESULT hr{E_FAIL}; - if(PostThreadMessage(ThreadID, WM_USER_StartDevice, (WPARAM)&req, (LPARAM)proxy)) - hr = WaitForResponse(&req); - + HRESULT hr{pushMessage(MsgType::StartDevice).get()}; return SUCCEEDED(hr) ? ALC_TRUE : ALC_FALSE; } @@ -1693,12 +1613,7 @@ HRESULT WasapiCapture::startProxy() void WasapiCapture::stop() -{ - ThreadRequest req{ mMsgEvent, 0 }; - auto proxy = static_cast(this); - if(PostThreadMessage(ThreadID, WM_USER_StopDevice, (WPARAM)&req, (LPARAM)proxy)) - (void)WaitForResponse(&req); -} +{ pushMessage(MsgType::StopDevice).wait(); } void WasapiCapture::stopProxy() { @@ -1729,22 +1644,17 @@ ALCenum WasapiCapture::captureSamples(void *buffer, ALCuint samples) bool WasapiBackendFactory::init() { - static HRESULT InitResult; + static HRESULT InitResult{E_FAIL}; - if(!ThreadHdl) + if(FAILED(InitResult)) try { - ThreadRequest req; - InitResult = E_FAIL; + std::promise promise; + auto future = promise.get_future(); - req.FinishedEvt = CreateEventW(nullptr, FALSE, FALSE, nullptr); - if(req.FinishedEvt == nullptr) - ERR("Failed to create event: %lu\n", GetLastError()); - else - { - ThreadHdl = CreateThread(nullptr, 0, &WasapiProxy::messageHandler, &req, 0, &ThreadID); - if(ThreadHdl != nullptr) InitResult = WaitForResponse(&req); - CloseHandle(req.FinishedEvt); - } + std::thread{&WasapiProxy::messageHandler, &promise}.detach(); + InitResult = future.get(); + } + catch(...) { } return SUCCEEDED(InitResult) ? ALC_TRUE : ALC_FALSE; @@ -1755,13 +1665,8 @@ void WasapiBackendFactory::deinit() PlaybackDevices.clear(); CaptureDevices.clear(); - if(ThreadHdl) - { - TRACE("Sending WM_QUIT to Thread %04lx\n", ThreadID); - PostThreadMessage(ThreadID, WM_QUIT, 0, 0); - CloseHandle(ThreadHdl); - ThreadHdl = nullptr; - } + TRACE("Sending quit message\n"); + WasapiProxy::pushMessageStatic(MsgType::QuitThread); } bool WasapiBackendFactory::querySupport(BackendType type) @@ -1769,35 +1674,27 @@ bool WasapiBackendFactory::querySupport(BackendType type) void WasapiBackendFactory::probe(DevProbe type, std::string *outnames) { - ThreadRequest req{ nullptr, 0 }; - - req.FinishedEvt = CreateEventW(nullptr, FALSE, FALSE, nullptr); - if(req.FinishedEvt == nullptr) - ERR("Failed to create event: %lu\n", GetLastError()); - else + auto add_device = [outnames](const DevMap &entry) -> void { - auto add_device = [outnames](const DevMap &entry) -> void - { - /* +1 to also append the null char (to ensure a null-separated list - * and double-null terminated list). - */ - outnames->append(entry.name.c_str(), entry.name.length()+1); - }; - HRESULT hr = E_FAIL; - if(PostThreadMessage(ThreadID, WM_USER_Enumerate, (WPARAM)&req, static_cast(type))) - hr = WaitForResponse(&req); - if(SUCCEEDED(hr)) switch(type) - { - case DevProbe::Playback: + /* +1 to also append the null char (to ensure a null-separated list and + * double-null terminated list). + */ + outnames->append(entry.name.c_str(), entry.name.length()+1); + }; + HRESULT hr{}; + switch(type) + { + case DevProbe::Playback: + hr = WasapiProxy::pushMessageStatic(MsgType::EnumeratePlayback).get(); + if(SUCCEEDED(hr)) std::for_each(PlaybackDevices.cbegin(), PlaybackDevices.cend(), add_device); - break; + break; - case DevProbe::Capture: + case DevProbe::Capture: + hr = WasapiProxy::pushMessageStatic(MsgType::EnumerateCapture).get(); + if(SUCCEEDED(hr)) std::for_each(CaptureDevices.cbegin(), CaptureDevices.cend(), add_device); - break; - } - CloseHandle(req.FinishedEvt); - req.FinishedEvt = nullptr; + break; } } -- cgit v1.2.3