From 0d758babf8d20632fc30ff56b0cdaeba8f5cf4ea Mon Sep 17 00:00:00 2001 From: Tadas Baltrusaitis Date: Wed, 30 May 2018 20:13:47 +0100 Subject: [PATCH] A different way of threading output. --- .../Utilities/include/RecorderOpenFace.h | 18 +- lib/local/Utilities/src/RecorderOpenFace.cpp | 225 +++++++++--------- lib/local/Utilities/src/SequenceCapture.cpp | 1 - 3 files changed, 119 insertions(+), 125 deletions(-) diff --git a/lib/local/Utilities/include/RecorderOpenFace.h b/lib/local/Utilities/include/RecorderOpenFace.h index fdcd6a74..bbfb6c29 100644 --- a/lib/local/Utilities/include/RecorderOpenFace.h +++ b/lib/local/Utilities/include/RecorderOpenFace.h @@ -49,6 +49,9 @@ #include #include +// Threading includes +#include + namespace Utilities { @@ -113,16 +116,6 @@ namespace Utilities private: - // Used to keep track if the recording is still going (for the writing threads) - bool recording; - - // For keeping track of tasks - tbb::task_group writing_threads; - - // A thread that will write video output, so that the rest of the application does not block on it - void VideoWritingTask(); - void AlignedImageWritingTask(); - // Blocking copy, assignment and move operators, as it does not make sense to save to the same location RecorderOpenFace & operator= (const RecorderOpenFace& other); RecorderOpenFace & operator= (const RecorderOpenFace&& other); @@ -188,6 +181,11 @@ namespace Utilities cv::Mat aligned_face; tbb::concurrent_bounded_queue > aligned_face_queue; + std::thread video_writing_thread; + std::thread aligned_writing_thread; + + bool closed = false; + }; } #endif \ No newline at end of file diff --git a/lib/local/Utilities/src/RecorderOpenFace.cpp b/lib/local/Utilities/src/RecorderOpenFace.cpp index 90f258d5..11d77bd1 100644 --- a/lib/local/Utilities/src/RecorderOpenFace.cpp +++ b/lib/local/Utilities/src/RecorderOpenFace.cpp @@ -80,9 +80,62 @@ void CreateDirectory(std::string output_path) } } +void VideoWritingTask(tbb::concurrent_bounded_queue > *writing_queue, bool is_sequence, cv::VideoWriter *video_writer) +{ + + std::pair tracked_data; + + while (true) + { + writing_queue->pop(tracked_data); + + // Indicate that the thread should complete + if (tracked_data.second.empty()) + { + break; + } + + if (is_sequence) + { + if (video_writer->isOpened()) + { + video_writer->write(tracked_data.second); + } + } + else + { + bool out_success = cv::imwrite(tracked_data.first, tracked_data.second); + if (!out_success) + { + WARN_STREAM("Could not output tracked image"); + } + } + } +} + +void AlignedImageWritingTask(tbb::concurrent_bounded_queue > *writing_queue) +{ + std::pair tracked_data; + + while (true) + { + writing_queue->pop(tracked_data); + + // Empty frame indicates termination + if (tracked_data.second.empty()) + break; + + bool write_success = cv::imwrite(tracked_data.first, tracked_data.second); + + if (!write_success) + { + WARN_STREAM("Could not output similarity aligned image image"); + } + } +} + void RecorderOpenFace::PrepareRecording(const std::string& in_filename) { - recording = true; // Construct the directories required for the output CreateDirectory(record_root); @@ -148,9 +201,6 @@ void RecorderOpenFace::PrepareRecording(const std::string& in_filename) metadata_file << "Output image:" << this->media_filename << endl; this->media_filename = (path(record_root) / this->media_filename).string(); } - - // Start the video and image writing thread - writing_threads.run([&] {VideoWritingTask(); }); } // Prepare image recording @@ -159,14 +209,11 @@ void RecorderOpenFace::PrepareRecording(const std::string& in_filename) aligned_output_directory = out_name + "_aligned"; metadata_file << "Output aligned directory:" << this->aligned_output_directory << endl; this->aligned_output_directory = (path(record_root) / this->aligned_output_directory).string(); - CreateDirectory(aligned_output_directory); - - // Start the video and image writing thread - writing_threads.run([&] {AlignedImageWritingTask(); }); + CreateDirectory(aligned_output_directory); } this->frame_number = 0; - + closed = false; } RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const RecorderOpenFaceParameters& parameters, std::vector& arguments):video_writer(), params(parameters) @@ -229,7 +276,7 @@ RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const Recorder } -RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const RecorderOpenFaceParameters& parameters, std::string output_directory):video_writer(), params(parameters) +RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const RecorderOpenFaceParameters& parameters, std::string output_directory):video_writer(), params(parameters), closed(false) { // From the filename, strip out the name without directory and extension if (boost::filesystem::is_directory(in_filename)) @@ -260,87 +307,7 @@ void RecorderOpenFace::SetObservationVisualization(const cv::Mat &vis_track) { if (params.outputTracked()) { - // Initialize the video writer if it has not been opened yet - if(params.isSequence() && !video_writer.isOpened()) - { - std::string output_codec = params.outputCodec(); - try - { - video_writer.open(media_filename, CV_FOURCC(output_codec[0], output_codec[1], output_codec[2], output_codec[3]), params.outputFps(), vis_track.size(), true); - - if (!video_writer.isOpened()) - { - WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN."); - } - } - catch (cv::Exception e) - { - WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN. Currently using codec " << output_codec << ", try using an other one (-oc option)"); - } - - } vis_to_out = vis_track; - - } - -} - -void RecorderOpenFace::AlignedImageWritingTask() -{ - std::cout << "Aligned writing thread has started" << std::endl; - - while (recording || !aligned_face_queue.empty()) - { - std::pair tracked_data; - - while (aligned_face_queue.try_pop(tracked_data)) - { - bool write_success = cv::imwrite(tracked_data.first, tracked_data.second); - - if (!write_success) - { - WARN_STREAM("Could not output similarity aligned image image"); - } - } - std::cout << "sleeping while waiting to write, aligned face queue size" << aligned_face_queue.size() << std::endl; - - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - -} - -void RecorderOpenFace::VideoWritingTask() -{ - std::cout << "Video writing thread has started" << std::endl; - - while(recording || !vis_to_out_queue.empty()) - { - - std::pair tracked_data; - - while (vis_to_out_queue.try_pop(tracked_data)) - { - if (params.isSequence()) - { - if (video_writer.isOpened()) - { - video_writer.write(tracked_data.second); - } - } - else - { - bool out_success = cv::imwrite(tracked_data.first, tracked_data.second); - if (!out_success) - { - WARN_STREAM("Could not output tracked image"); - } - } - } - - std::cout << "sleeping while waiting to write, video queue size" << vis_to_out_queue.size() << std::endl; - - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } } @@ -397,10 +364,16 @@ void RecorderOpenFace::WriteObservation() // Write aligned faces if (params.outputAlignedFaces()) { - if (frame_number == 1) + // To support both video and image input + if ((face_id == 0 && frame_number == 0) || (face_id == 0 && frame_number == 1)) { + int capacity = (1024 * 1024 * ALIGNED_QUEUE_CAPACITY) / (aligned_face.size().width *aligned_face.size().height * aligned_face.channels()); aligned_face_queue.set_capacity(capacity); + + // Start the alignment output thread + aligned_writing_thread = std::thread(&AlignedImageWritingTask, &aligned_face_queue); + } char name[100]; @@ -420,10 +393,7 @@ void RecorderOpenFace::WriteObservation() if(params.outputBadAligned() || landmark_detection_success) { - while (!aligned_face_queue.try_push(std::pair(out_file, aligned_face))) - { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } + aligned_face_queue.push(std::pair(out_file, aligned_face)); } // Clear the image @@ -437,12 +407,35 @@ void RecorderOpenFace::WriteObservationTracked() { if (params.outputTracked()) { - - if (frame_number == 1) + // To support both video and image input + if ((!params.isSequence() && frame_number == 0) || (params.isSequence() && frame_number == 1)) { // Set up the queue for video writing based on output size int capacity = (1024 * 1024 * TRACKED_QUEUE_CAPACITY) / (vis_to_out.size().width * vis_to_out.size().height * vis_to_out.channels()); vis_to_out_queue.set_capacity(capacity); + + // Initialize the video writer if it has not been opened yet + if (params.isSequence()) + { + std::string output_codec = params.outputCodec(); + try + { + video_writer.open(media_filename, CV_FOURCC(output_codec[0], output_codec[1], output_codec[2], output_codec[3]), params.outputFps(), vis_to_out.size(), true); + + if (!video_writer.isOpened()) + { + WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN."); + } + } + catch (cv::Exception e) + { + WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN. Currently using codec " << output_codec << ", try using an other one (-oc option)"); + } + } + + // Start the video and tracked image writing thread + video_writing_thread = std::thread(&VideoWritingTask, &vis_to_out_queue, params.isSequence(), &video_writer); + } if (vis_to_out.empty()) @@ -452,19 +445,13 @@ void RecorderOpenFace::WriteObservationTracked() if (params.isSequence()) { - while (!vis_to_out_queue.try_push(std::pair("", vis_to_out))) - { - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - } - + vis_to_out_queue.push(std::pair("", vis_to_out)); } else { - while (!vis_to_out_queue.try_push(std::pair(media_filename, vis_to_out))) - { - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - } + vis_to_out_queue.push(std::pair(media_filename, vis_to_out)); } + // Clear the output vis_to_out = cv::Mat(); } @@ -535,16 +522,26 @@ RecorderOpenFace::~RecorderOpenFace() void RecorderOpenFace::Close() { - recording = false; - - // Make sure the recording threads complete - writing_threads.wait(); + if(!closed) + { - hog_recorder.Close(); - csv_recorder.Close(); - video_writer.release(); - metadata_file.close(); + // Insert terminating frames to the queues + vis_to_out_queue.push(std::pair("", cv::Mat())); + aligned_face_queue.push(std::pair("", cv::Mat())); + // Make sure the recording threads complete + if (video_writing_thread.joinable()) + video_writing_thread.join(); + + if (aligned_writing_thread.joinable()) + aligned_writing_thread.join(); + + hog_recorder.Close(); + csv_recorder.Close(); + video_writer.release(); + metadata_file.close(); + closed = true; + } } diff --git a/lib/local/Utilities/src/SequenceCapture.cpp b/lib/local/Utilities/src/SequenceCapture.cpp index ba397f51..507f7f63 100644 --- a/lib/local/Utilities/src/SequenceCapture.cpp +++ b/lib/local/Utilities/src/SequenceCapture.cpp @@ -269,7 +269,6 @@ bool SequenceCapture::OpenWebcam(int device, int image_width, int image_height, start_time = cv::getTickCount(); capturing = true; - //capture_threads.run([&] {CaptureThread(); }); return true;