aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Robinson <[email protected]>2017-12-30 20:34:33 -0800
committerChris Robinson <[email protected]>2017-12-30 20:34:33 -0800
commit0136df1e6261d7da7b5a46366431250ba2ad771a (patch)
tree04313eae01b1136ed3eb8d13f909a0f95964e9fa
parent98731bb574c77b291f86195a2c2fb1f6d00fe7a4 (diff)
Improve the alffplay queue for FFmpeg's send/receive API
The packet handling thread now calls avcodec_send_packet to give compressed data to libavcodec, while the audio/video threads call avcodec_receive_frame to handle decoded frames. The packet thread still maintains local queues for each stream to avoid starving an A/V thread when the other doesn't want another frame yet.
-rw-r--r--examples/alffplay.cpp291
1 files changed, 139 insertions, 152 deletions
diff --git a/examples/alffplay.cpp b/examples/alffplay.cpp
index 10c00553..7c6b9f6e 100644
--- a/examples/alffplay.cpp
+++ b/examples/alffplay.cpp
@@ -69,24 +69,44 @@ enum {
};
-struct PacketQueue {
+class PacketQueue {
std::deque<AVPacket> mPackets;
- std::atomic<int> mTotalSize;
- std::atomic<bool> mFinished;
- std::mutex mMutex;
- std::condition_variable mCond;
+ size_t mTotalSize;
- PacketQueue() : mTotalSize(0), mFinished(false)
- { }
- ~PacketQueue()
- { clear(); }
+public:
+ PacketQueue() : mTotalSize(0) { }
+ ~PacketQueue() { clear(); }
- int put(const AVPacket *pkt);
- int peek(AVPacket *pkt, std::atomic<bool> &quit_var);
- void pop();
+ bool empty() const noexcept { return mPackets.empty(); }
+ size_t totalSize() const noexcept { return mTotalSize; }
- void clear();
- void finish();
+ void put(const AVPacket *pkt)
+ {
+ mPackets.push_back(AVPacket{});
+ if(av_packet_ref(&mPackets.back(), pkt) != 0)
+ mPackets.pop_back();
+ else
+ mTotalSize += mPackets.back().size;
+ }
+
+ AVPacket *front() noexcept
+ { return &mPackets.front(); }
+
+ void pop()
+ {
+ AVPacket *pkt = &mPackets.front();
+ mTotalSize -= pkt->size;
+ av_packet_unref(pkt);
+ mPackets.pop_front();
+ }
+
+ void clear()
+ {
+ for(AVPacket &pkt : mPackets)
+ av_packet_unref(&pkt);
+ mPackets.clear();
+ mTotalSize = 0;
+ }
};
@@ -98,7 +118,8 @@ struct AudioState {
AVStream *mStream;
AVCodecContext *mCodecCtx;
- PacketQueue mQueue;
+ std::mutex mQueueMtx;
+ std::condition_variable mQueueCond;
/* Used for clock difference average computation */
struct {
@@ -174,7 +195,8 @@ struct VideoState {
AVStream *mStream;
AVCodecContext *mCodecCtx;
- PacketQueue mQueue;
+ std::mutex mQueueMtx;
+ std::condition_variable mQueueCond;
double mClock;
double mFrameTimer;
@@ -241,7 +263,6 @@ struct VideoState {
struct MovieState {
AVFormatContext *mFormatCtx;
- int mVideoStream, mAudioStream;
int mAVSyncType;
@@ -259,9 +280,9 @@ struct MovieState {
std::string mFilename;
MovieState(std::string fname)
- : mFormatCtx(nullptr), mVideoStream(0), mAudioStream(0)
- , mAVSyncType(DEFAULT_AV_SYNC_TYPE), mExternalClockBase(0), mQuit(false)
- , mAudio(this), mVideo(this), mFilename(std::move(fname))
+ : mFormatCtx(nullptr), mAVSyncType(DEFAULT_AV_SYNC_TYPE)
+ , mExternalClockBase(0), mQuit(false), mAudio(this), mVideo(this)
+ , mFilename(std::move(fname))
{ }
~MovieState()
{
@@ -284,68 +305,6 @@ struct MovieState {
};
-int PacketQueue::put(const AVPacket *pkt)
-{
- std::unique_lock<std::mutex> lock(mMutex);
- mPackets.push_back(AVPacket{});
- if(av_packet_ref(&mPackets.back(), pkt) != 0)
- {
- mPackets.pop_back();
- return -1;
- }
- mTotalSize += mPackets.back().size;
- lock.unlock();
-
- mCond.notify_one();
- return 0;
-}
-
-int PacketQueue::peek(AVPacket *pkt, std::atomic<bool> &quit_var)
-{
- std::unique_lock<std::mutex> lock(mMutex);
- while(!quit_var.load())
- {
- if(!mPackets.empty())
- {
- if(av_packet_ref(pkt, &mPackets.front()) != 0)
- return -1;
- return 1;
- }
-
- if(mFinished.load())
- return 0;
- mCond.wait(lock);
- }
- return -1;
-}
-
-void PacketQueue::pop()
-{
- std::unique_lock<std::mutex> lock(mMutex);
- AVPacket *pkt = &mPackets.front();
- mTotalSize -= pkt->size;
- av_packet_unref(pkt);
- mPackets.pop_front();
-}
-
-void PacketQueue::clear()
-{
- std::unique_lock<std::mutex> lock(mMutex);
- std::for_each(mPackets.begin(), mPackets.end(),
- [](AVPacket &pkt) { av_packet_unref(&pkt); }
- );
- mPackets.clear();
- mTotalSize = 0;
-}
-void PacketQueue::finish()
-{
- std::unique_lock<std::mutex> lock(mMutex);
- mFinished = true;
- lock.unlock();
- mCond.notify_all();
-}
-
-
double AudioState::getClock()
{
double pts;
@@ -440,29 +399,18 @@ int AudioState::decodeFrame()
{
while(!mMovie->mQuit.load())
{
- while(!mMovie->mQuit.load())
- {
- /* Get the next packet */
- AVPacket pkt{};
- if(mQueue.peek(&pkt, mMovie->mQuit) <= 0)
- return -1;
-
- int ret = avcodec_send_packet(mCodecCtx, &pkt);
- if(ret != AVERROR(EAGAIN))
- {
- if(ret < 0)
- std::cerr<< "Failed to send encoded packet: 0x"<<std::hex<<ret<<std::dec <<std::endl;
- mQueue.pop();
- }
- av_packet_unref(&pkt);
- if(ret == 0 || ret == AVERROR(EAGAIN))
- break;
- }
-
+ std::unique_lock<std::mutex> lock(mQueueMtx);
int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
if(ret == AVERROR(EAGAIN))
- continue;
- if(ret == AVERROR_EOF || ret < 0)
+ {
+ do {
+ mQueueCond.wait(lock);
+ ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
+ } while(ret == AVERROR(EAGAIN));
+ }
+ lock.unlock();
+ if(ret == AVERROR_EOF) break;
+ if(ret < 0)
{
std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
return 0;
@@ -1110,28 +1058,19 @@ int VideoState::handler()
mDecodedFrame = av_frame_alloc();
while(!mMovie->mQuit)
{
- while(!mMovie->mQuit)
- {
- AVPacket packet{};
- if(mQueue.peek(&packet, mMovie->mQuit) <= 0)
- goto finish;
-
- int ret = avcodec_send_packet(mCodecCtx, &packet);
- if(ret != AVERROR(EAGAIN))
- {
- if(ret < 0)
- std::cerr<< "Failed to send encoded packet: 0x"<<std::hex<<ret<<std::dec <<std::endl;
- mQueue.pop();
- }
- av_packet_unref(&packet);
- if(ret == 0 || ret == AVERROR(EAGAIN))
- break;
- }
-
+ std::unique_lock<std::mutex> lock(mQueueMtx);
/* Decode video frame */
int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
if(ret == AVERROR(EAGAIN))
- continue;
+ {
+ do {
+ mQueueCond.wait(lock);
+ ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
+ } while(ret == AVERROR(EAGAIN));
+ }
+ lock.unlock();
+ if(ret == AVERROR_EOF)
+ break;
if(ret < 0)
{
std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
@@ -1145,7 +1084,6 @@ int VideoState::handler()
break;
av_frame_unref(mDecodedFrame);
}
-finish:
mEOS = true;
av_frame_free(&mDecodedFrame);
@@ -1254,7 +1192,6 @@ int MovieState::streamComponentOpen(int stream_index)
switch(avctx->codec_type)
{
case AVMEDIA_TYPE_AUDIO:
- mAudioStream = stream_index;
mAudio.mStream = mFormatCtx->streams[stream_index];
mAudio.mCodecCtx = avctx;
@@ -1267,7 +1204,6 @@ int MovieState::streamComponentOpen(int stream_index)
break;
case AVMEDIA_TYPE_VIDEO:
- mVideoStream = stream_index;
mVideo.mStream = mFormatCtx->streams[stream_index];
mVideo.mCodecCtx = avctx;
@@ -1280,10 +1216,10 @@ int MovieState::streamComponentOpen(int stream_index)
default:
avcodec_free_context(&avctx);
- break;
+ return -1;
}
- return 0;
+ return stream_index;
}
int MovieState::parse_handler()
@@ -1291,9 +1227,6 @@ int MovieState::parse_handler()
int video_index = -1;
int audio_index = -1;
- mVideoStream = -1;
- mAudioStream = -1;
-
/* Dump information about file onto standard error */
av_dump_format(mFormatCtx, 0, mFilename.c_str(), 0);
@@ -1309,39 +1242,93 @@ int MovieState::parse_handler()
* components time to start without needing to skip ahead.
*/
mExternalClockBase = av_gettime() + 50000;
- if(audio_index >= 0)
- streamComponentOpen(audio_index);
- if(video_index >= 0)
- streamComponentOpen(video_index);
+ if(audio_index >= 0) audio_index = streamComponentOpen(audio_index);
+ if(video_index >= 0) video_index = streamComponentOpen(video_index);
- if(mVideoStream < 0 && mAudioStream < 0)
+ if(video_index < 0 && audio_index < 0)
{
std::cerr<< mFilename<<": could not open codecs" <<std::endl;
mQuit = true;
}
- /* Main packet handling loop */
- while(!mQuit.load())
- {
- if(mAudio.mQueue.mTotalSize + mVideo.mQueue.mTotalSize >= MAX_QUEUE_SIZE)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- continue;
- }
+ PacketQueue audio_queue, video_queue;
+ bool input_finished = false;
+ /* Main packet reading/dispatching loop */
+ while(!mQuit.load(std::memory_order_relaxed) && !input_finished)
+ {
AVPacket packet;
if(av_read_frame(mFormatCtx, &packet) < 0)
- break;
+ input_finished = true;
+ else
+ {
+ /* Copy the packet into the queue it's meant for. */
+ if(packet.stream_index == video_index)
+ video_queue.put(&packet);
+ else if(packet.stream_index == audio_index)
+ audio_queue.put(&packet);
+ av_packet_unref(&packet);
+ }
- /* Copy the packet in the queue it's meant for. */
- if(packet.stream_index == mVideoStream)
- mVideo.mQueue.put(&packet);
- else if(packet.stream_index == mAudioStream)
- mAudio.mQueue.put(&packet);
- av_packet_unref(&packet);
+ do {
+ /* Send whatever queued packets we have. */
+ bool sent;
+ do {
+ sent = false;
+ if(!audio_queue.empty())
+ {
+ std::unique_lock<std::mutex> lock(mAudio.mQueueMtx);
+ int ret = avcodec_send_packet(mAudio.mCodecCtx, audio_queue.front());
+ if(ret != AVERROR(EAGAIN))
+ {
+ lock.unlock();
+ mAudio.mQueueCond.notify_one();
+ audio_queue.pop();
+ sent = true;
+ }
+ }
+ if(!video_queue.empty())
+ {
+ std::unique_lock<std::mutex> lock(mVideo.mQueueMtx);
+ int ret = avcodec_send_packet(mVideo.mCodecCtx, video_queue.front());
+ if(ret != AVERROR(EAGAIN))
+ {
+ lock.unlock();
+ mVideo.mQueueCond.notify_one();
+ video_queue.pop();
+ sent = true;
+ }
+ }
+ } while(sent);
+ /* If the queues are completely empty, or it's not full and there's
+ * more input to read, go get more.
+ */
+ size_t queue_size = audio_queue.totalSize() + video_queue.totalSize();
+ if(queue_size == 0 || (queue_size < MAX_QUEUE_SIZE && !input_finished))
+ break;
+ /* Nothing to send or get for now, wait a bit and try again. */
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ } while(!mQuit.load(std::memory_order_relaxed));
+ }
+ /* Pass a null packet to finish the send buffers (the receive functions
+ * will get AVERROR_EOF when emptied).
+ */
+ if(mVideo.mCodecCtx != nullptr)
+ {
+ { std::lock_guard<std::mutex> lock(mVideo.mQueueMtx);
+ avcodec_send_packet(mVideo.mCodecCtx, nullptr);
+ }
+ mVideo.mQueueCond.notify_one();
+ }
+ if(mAudio.mCodecCtx != nullptr)
+ {
+ { std::lock_guard<std::mutex> lock(mAudio.mQueueMtx);
+ avcodec_send_packet(mAudio.mCodecCtx, nullptr);
+ }
+ mAudio.mQueueCond.notify_one();
}
- mVideo.mQueue.finish();
- mAudio.mQueue.finish();
+ video_queue.clear();
+ audio_queue.clear();
/* all done - wait for it */
if(mVideoThread.joinable())