From 0136df1e6261d7da7b5a46366431250ba2ad771a Mon Sep 17 00:00:00 2001 From: Chris Robinson Date: Sat, 30 Dec 2017 20:34:33 -0800 Subject: Improve the alffplay queue for FFmpeg's send/receive API The packet handling thread now calls avcodec_send_packet to give compressed data to libavcodec, while the audio/video threads call avcodec_receive_frame to handle decoded frames. The packet thread still maintains local queues for each stream to avoid starving an A/V thread when the other doesn't want another frame yet. --- examples/alffplay.cpp | 291 ++++++++++++++++++++++++-------------------------- 1 file changed, 139 insertions(+), 152 deletions(-) diff --git a/examples/alffplay.cpp b/examples/alffplay.cpp index 10c00553..7c6b9f6e 100644 --- a/examples/alffplay.cpp +++ b/examples/alffplay.cpp @@ -69,24 +69,44 @@ enum { }; -struct PacketQueue { +class PacketQueue { std::deque mPackets; - std::atomic mTotalSize; - std::atomic mFinished; - std::mutex mMutex; - std::condition_variable mCond; + size_t mTotalSize; - PacketQueue() : mTotalSize(0), mFinished(false) - { } - ~PacketQueue() - { clear(); } +public: + PacketQueue() : mTotalSize(0) { } + ~PacketQueue() { clear(); } - int put(const AVPacket *pkt); - int peek(AVPacket *pkt, std::atomic &quit_var); - void pop(); + bool empty() const noexcept { return mPackets.empty(); } + size_t totalSize() const noexcept { return mTotalSize; } - void clear(); - void finish(); + void put(const AVPacket *pkt) + { + mPackets.push_back(AVPacket{}); + if(av_packet_ref(&mPackets.back(), pkt) != 0) + mPackets.pop_back(); + else + mTotalSize += mPackets.back().size; + } + + AVPacket *front() noexcept + { return &mPackets.front(); } + + void pop() + { + AVPacket *pkt = &mPackets.front(); + mTotalSize -= pkt->size; + av_packet_unref(pkt); + mPackets.pop_front(); + } + + void clear() + { + for(AVPacket &pkt : mPackets) + av_packet_unref(&pkt); + mPackets.clear(); + mTotalSize = 0; + } }; @@ -98,7 +118,8 @@ struct AudioState { AVStream *mStream; AVCodecContext *mCodecCtx; - PacketQueue mQueue; + std::mutex mQueueMtx; + std::condition_variable mQueueCond; /* Used for clock difference average computation */ struct { @@ -174,7 +195,8 @@ struct VideoState { AVStream *mStream; AVCodecContext *mCodecCtx; - PacketQueue mQueue; + std::mutex mQueueMtx; + std::condition_variable mQueueCond; double mClock; double mFrameTimer; @@ -241,7 +263,6 @@ struct VideoState { struct MovieState { AVFormatContext *mFormatCtx; - int mVideoStream, mAudioStream; int mAVSyncType; @@ -259,9 +280,9 @@ struct MovieState { std::string mFilename; MovieState(std::string fname) - : mFormatCtx(nullptr), mVideoStream(0), mAudioStream(0) - , mAVSyncType(DEFAULT_AV_SYNC_TYPE), mExternalClockBase(0), mQuit(false) - , mAudio(this), mVideo(this), mFilename(std::move(fname)) + : mFormatCtx(nullptr), mAVSyncType(DEFAULT_AV_SYNC_TYPE) + , mExternalClockBase(0), mQuit(false), mAudio(this), mVideo(this) + , mFilename(std::move(fname)) { } ~MovieState() { @@ -284,68 +305,6 @@ struct MovieState { }; -int PacketQueue::put(const AVPacket *pkt) -{ - std::unique_lock lock(mMutex); - mPackets.push_back(AVPacket{}); - if(av_packet_ref(&mPackets.back(), pkt) != 0) - { - mPackets.pop_back(); - return -1; - } - mTotalSize += mPackets.back().size; - lock.unlock(); - - mCond.notify_one(); - return 0; -} - -int PacketQueue::peek(AVPacket *pkt, std::atomic &quit_var) -{ - std::unique_lock lock(mMutex); - while(!quit_var.load()) - { - if(!mPackets.empty()) - { - if(av_packet_ref(pkt, &mPackets.front()) != 0) - return -1; - return 1; - } - - if(mFinished.load()) - return 0; - mCond.wait(lock); - } - return -1; -} - -void PacketQueue::pop() -{ - std::unique_lock lock(mMutex); - AVPacket *pkt = &mPackets.front(); - mTotalSize -= pkt->size; - av_packet_unref(pkt); - mPackets.pop_front(); -} - -void PacketQueue::clear() -{ - std::unique_lock lock(mMutex); - std::for_each(mPackets.begin(), mPackets.end(), - [](AVPacket &pkt) { av_packet_unref(&pkt); } - ); - mPackets.clear(); - mTotalSize = 0; -} -void PacketQueue::finish() -{ - std::unique_lock lock(mMutex); - mFinished = true; - lock.unlock(); - mCond.notify_all(); -} - - double AudioState::getClock() { double pts; @@ -440,29 +399,18 @@ int AudioState::decodeFrame() { while(!mMovie->mQuit.load()) { - while(!mMovie->mQuit.load()) - { - /* Get the next packet */ - AVPacket pkt{}; - if(mQueue.peek(&pkt, mMovie->mQuit) <= 0) - return -1; - - int ret = avcodec_send_packet(mCodecCtx, &pkt); - if(ret != AVERROR(EAGAIN)) - { - if(ret < 0) - std::cerr<< "Failed to send encoded packet: 0x"< lock(mQueueMtx); int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame); if(ret == AVERROR(EAGAIN)) - continue; - if(ret == AVERROR_EOF || ret < 0) + { + do { + mQueueCond.wait(lock); + ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame); + } while(ret == AVERROR(EAGAIN)); + } + lock.unlock(); + if(ret == AVERROR_EOF) break; + if(ret < 0) { std::cerr<< "Failed to decode frame: "<mQuit) { - while(!mMovie->mQuit) - { - AVPacket packet{}; - if(mQueue.peek(&packet, mMovie->mQuit) <= 0) - goto finish; - - int ret = avcodec_send_packet(mCodecCtx, &packet); - if(ret != AVERROR(EAGAIN)) - { - if(ret < 0) - std::cerr<< "Failed to send encoded packet: 0x"< lock(mQueueMtx); /* Decode video frame */ int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame); if(ret == AVERROR(EAGAIN)) - continue; + { + do { + mQueueCond.wait(lock); + ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame); + } while(ret == AVERROR(EAGAIN)); + } + lock.unlock(); + if(ret == AVERROR_EOF) + break; if(ret < 0) { std::cerr<< "Failed to decode frame: "<codec_type) { case AVMEDIA_TYPE_AUDIO: - mAudioStream = stream_index; mAudio.mStream = mFormatCtx->streams[stream_index]; mAudio.mCodecCtx = avctx; @@ -1267,7 +1204,6 @@ int MovieState::streamComponentOpen(int stream_index) break; case AVMEDIA_TYPE_VIDEO: - mVideoStream = stream_index; mVideo.mStream = mFormatCtx->streams[stream_index]; mVideo.mCodecCtx = avctx; @@ -1280,10 +1216,10 @@ int MovieState::streamComponentOpen(int stream_index) default: avcodec_free_context(&avctx); - break; + return -1; } - return 0; + return stream_index; } int MovieState::parse_handler() @@ -1291,9 +1227,6 @@ int MovieState::parse_handler() int video_index = -1; int audio_index = -1; - mVideoStream = -1; - mAudioStream = -1; - /* Dump information about file onto standard error */ av_dump_format(mFormatCtx, 0, mFilename.c_str(), 0); @@ -1309,39 +1242,93 @@ int MovieState::parse_handler() * components time to start without needing to skip ahead. */ mExternalClockBase = av_gettime() + 50000; - if(audio_index >= 0) - streamComponentOpen(audio_index); - if(video_index >= 0) - streamComponentOpen(video_index); + if(audio_index >= 0) audio_index = streamComponentOpen(audio_index); + if(video_index >= 0) video_index = streamComponentOpen(video_index); - if(mVideoStream < 0 && mAudioStream < 0) + if(video_index < 0 && audio_index < 0) { std::cerr<< mFilename<<": could not open codecs" <= MAX_QUEUE_SIZE) - { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } + PacketQueue audio_queue, video_queue; + bool input_finished = false; + /* Main packet reading/dispatching loop */ + while(!mQuit.load(std::memory_order_relaxed) && !input_finished) + { AVPacket packet; if(av_read_frame(mFormatCtx, &packet) < 0) - break; + input_finished = true; + else + { + /* Copy the packet into the queue it's meant for. */ + if(packet.stream_index == video_index) + video_queue.put(&packet); + else if(packet.stream_index == audio_index) + audio_queue.put(&packet); + av_packet_unref(&packet); + } - /* Copy the packet in the queue it's meant for. */ - if(packet.stream_index == mVideoStream) - mVideo.mQueue.put(&packet); - else if(packet.stream_index == mAudioStream) - mAudio.mQueue.put(&packet); - av_packet_unref(&packet); + do { + /* Send whatever queued packets we have. */ + bool sent; + do { + sent = false; + if(!audio_queue.empty()) + { + std::unique_lock lock(mAudio.mQueueMtx); + int ret = avcodec_send_packet(mAudio.mCodecCtx, audio_queue.front()); + if(ret != AVERROR(EAGAIN)) + { + lock.unlock(); + mAudio.mQueueCond.notify_one(); + audio_queue.pop(); + sent = true; + } + } + if(!video_queue.empty()) + { + std::unique_lock lock(mVideo.mQueueMtx); + int ret = avcodec_send_packet(mVideo.mCodecCtx, video_queue.front()); + if(ret != AVERROR(EAGAIN)) + { + lock.unlock(); + mVideo.mQueueCond.notify_one(); + video_queue.pop(); + sent = true; + } + } + } while(sent); + /* If the queues are completely empty, or it's not full and there's + * more input to read, go get more. + */ + size_t queue_size = audio_queue.totalSize() + video_queue.totalSize(); + if(queue_size == 0 || (queue_size < MAX_QUEUE_SIZE && !input_finished)) + break; + /* Nothing to send or get for now, wait a bit and try again. */ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } while(!mQuit.load(std::memory_order_relaxed)); + } + /* Pass a null packet to finish the send buffers (the receive functions + * will get AVERROR_EOF when emptied). + */ + if(mVideo.mCodecCtx != nullptr) + { + { std::lock_guard lock(mVideo.mQueueMtx); + avcodec_send_packet(mVideo.mCodecCtx, nullptr); + } + mVideo.mQueueCond.notify_one(); + } + if(mAudio.mCodecCtx != nullptr) + { + { std::lock_guard lock(mAudio.mQueueMtx); + avcodec_send_packet(mAudio.mCodecCtx, nullptr); + } + mAudio.mQueueCond.notify_one(); } - mVideo.mQueue.finish(); - mAudio.mQueue.finish(); + video_queue.clear(); + audio_queue.clear(); /* all done - wait for it */ if(mVideoThread.joinable()) -- cgit v1.2.3