A different way of threading output.

This commit is contained in:
Tadas Baltrusaitis
2018-05-30 20:13:47 +01:00
parent 2a371a46d8
commit 0d758babf8
3 changed files with 119 additions and 125 deletions

View File

@@ -49,6 +49,9 @@
#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>
// Threading includes
#include <thread>
namespace Utilities
{
@@ -113,16 +116,6 @@ namespace Utilities
private:
// Used to keep track if the recording is still going (for the writing threads)
bool recording;
// For keeping track of tasks
tbb::task_group writing_threads;
// A thread that will write video output, so that the rest of the application does not block on it
void VideoWritingTask();
void AlignedImageWritingTask();
// Blocking copy, assignment and move operators, as it does not make sense to save to the same location
RecorderOpenFace & operator= (const RecorderOpenFace& other);
RecorderOpenFace & operator= (const RecorderOpenFace&& other);
@@ -188,6 +181,11 @@ namespace Utilities
cv::Mat aligned_face;
tbb::concurrent_bounded_queue<std::pair<std::string, cv::Mat> > aligned_face_queue;
std::thread video_writing_thread;
std::thread aligned_writing_thread;
bool closed = false;
};
}
#endif

View File

@@ -80,9 +80,62 @@ void CreateDirectory(std::string output_path)
}
}
void VideoWritingTask(tbb::concurrent_bounded_queue<std::pair<std::string, cv::Mat> > *writing_queue, bool is_sequence, cv::VideoWriter *video_writer)
{
std::pair<std::string, cv::Mat> tracked_data;
while (true)
{
writing_queue->pop(tracked_data);
// Indicate that the thread should complete
if (tracked_data.second.empty())
{
break;
}
if (is_sequence)
{
if (video_writer->isOpened())
{
video_writer->write(tracked_data.second);
}
}
else
{
bool out_success = cv::imwrite(tracked_data.first, tracked_data.second);
if (!out_success)
{
WARN_STREAM("Could not output tracked image");
}
}
}
}
void AlignedImageWritingTask(tbb::concurrent_bounded_queue<std::pair<std::string, cv::Mat> > *writing_queue)
{
std::pair<std::string, cv::Mat> tracked_data;
while (true)
{
writing_queue->pop(tracked_data);
// Empty frame indicates termination
if (tracked_data.second.empty())
break;
bool write_success = cv::imwrite(tracked_data.first, tracked_data.second);
if (!write_success)
{
WARN_STREAM("Could not output similarity aligned image image");
}
}
}
void RecorderOpenFace::PrepareRecording(const std::string& in_filename)
{
recording = true;
// Construct the directories required for the output
CreateDirectory(record_root);
@@ -148,9 +201,6 @@ void RecorderOpenFace::PrepareRecording(const std::string& in_filename)
metadata_file << "Output image:" << this->media_filename << endl;
this->media_filename = (path(record_root) / this->media_filename).string();
}
// Start the video and image writing thread
writing_threads.run([&] {VideoWritingTask(); });
}
// Prepare image recording
@@ -159,14 +209,11 @@ void RecorderOpenFace::PrepareRecording(const std::string& in_filename)
aligned_output_directory = out_name + "_aligned";
metadata_file << "Output aligned directory:" << this->aligned_output_directory << endl;
this->aligned_output_directory = (path(record_root) / this->aligned_output_directory).string();
CreateDirectory(aligned_output_directory);
// Start the video and image writing thread
writing_threads.run([&] {AlignedImageWritingTask(); });
CreateDirectory(aligned_output_directory);
}
this->frame_number = 0;
closed = false;
}
RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const RecorderOpenFaceParameters& parameters, std::vector<std::string>& arguments):video_writer(), params(parameters)
@@ -229,7 +276,7 @@ RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const Recorder
}
RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const RecorderOpenFaceParameters& parameters, std::string output_directory):video_writer(), params(parameters)
RecorderOpenFace::RecorderOpenFace(const std::string in_filename, const RecorderOpenFaceParameters& parameters, std::string output_directory):video_writer(), params(parameters), closed(false)
{
// From the filename, strip out the name without directory and extension
if (boost::filesystem::is_directory(in_filename))
@@ -260,87 +307,7 @@ void RecorderOpenFace::SetObservationVisualization(const cv::Mat &vis_track)
{
if (params.outputTracked())
{
// Initialize the video writer if it has not been opened yet
if(params.isSequence() && !video_writer.isOpened())
{
std::string output_codec = params.outputCodec();
try
{
video_writer.open(media_filename, CV_FOURCC(output_codec[0], output_codec[1], output_codec[2], output_codec[3]), params.outputFps(), vis_track.size(), true);
if (!video_writer.isOpened())
{
WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN.");
}
}
catch (cv::Exception e)
{
WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN. Currently using codec " << output_codec << ", try using an other one (-oc option)");
}
}
vis_to_out = vis_track;
}
}
void RecorderOpenFace::AlignedImageWritingTask()
{
std::cout << "Aligned writing thread has started" << std::endl;
while (recording || !aligned_face_queue.empty())
{
std::pair<std::string, cv::Mat> tracked_data;
while (aligned_face_queue.try_pop(tracked_data))
{
bool write_success = cv::imwrite(tracked_data.first, tracked_data.second);
if (!write_success)
{
WARN_STREAM("Could not output similarity aligned image image");
}
}
std::cout << "sleeping while waiting to write, aligned face queue size" << aligned_face_queue.size() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void RecorderOpenFace::VideoWritingTask()
{
std::cout << "Video writing thread has started" << std::endl;
while(recording || !vis_to_out_queue.empty())
{
std::pair<std::string, cv::Mat> tracked_data;
while (vis_to_out_queue.try_pop(tracked_data))
{
if (params.isSequence())
{
if (video_writer.isOpened())
{
video_writer.write(tracked_data.second);
}
}
else
{
bool out_success = cv::imwrite(tracked_data.first, tracked_data.second);
if (!out_success)
{
WARN_STREAM("Could not output tracked image");
}
}
}
std::cout << "sleeping while waiting to write, video queue size" << vis_to_out_queue.size() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
@@ -397,10 +364,16 @@ void RecorderOpenFace::WriteObservation()
// Write aligned faces
if (params.outputAlignedFaces())
{
if (frame_number == 1)
// To support both video and image input
if ((face_id == 0 && frame_number == 0) || (face_id == 0 && frame_number == 1))
{
int capacity = (1024 * 1024 * ALIGNED_QUEUE_CAPACITY) / (aligned_face.size().width *aligned_face.size().height * aligned_face.channels());
aligned_face_queue.set_capacity(capacity);
// Start the alignment output thread
aligned_writing_thread = std::thread(&AlignedImageWritingTask, &aligned_face_queue);
}
char name[100];
@@ -420,10 +393,7 @@ void RecorderOpenFace::WriteObservation()
if(params.outputBadAligned() || landmark_detection_success)
{
while (!aligned_face_queue.try_push(std::pair<std::string, cv::Mat>(out_file, aligned_face)))
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
aligned_face_queue.push(std::pair<std::string, cv::Mat>(out_file, aligned_face));
}
// Clear the image
@@ -437,12 +407,35 @@ void RecorderOpenFace::WriteObservationTracked()
{
if (params.outputTracked())
{
if (frame_number == 1)
// To support both video and image input
if ((!params.isSequence() && frame_number == 0) || (params.isSequence() && frame_number == 1))
{
// Set up the queue for video writing based on output size
int capacity = (1024 * 1024 * TRACKED_QUEUE_CAPACITY) / (vis_to_out.size().width * vis_to_out.size().height * vis_to_out.channels());
vis_to_out_queue.set_capacity(capacity);
// Initialize the video writer if it has not been opened yet
if (params.isSequence())
{
std::string output_codec = params.outputCodec();
try
{
video_writer.open(media_filename, CV_FOURCC(output_codec[0], output_codec[1], output_codec[2], output_codec[3]), params.outputFps(), vis_to_out.size(), true);
if (!video_writer.isOpened())
{
WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN.");
}
}
catch (cv::Exception e)
{
WARN_STREAM("Could not open VideoWriter, OUTPUT FILE WILL NOT BE WRITTEN. Currently using codec " << output_codec << ", try using an other one (-oc option)");
}
}
// Start the video and tracked image writing thread
video_writing_thread = std::thread(&VideoWritingTask, &vis_to_out_queue, params.isSequence(), &video_writer);
}
if (vis_to_out.empty())
@@ -452,19 +445,13 @@ void RecorderOpenFace::WriteObservationTracked()
if (params.isSequence())
{
while (!vis_to_out_queue.try_push(std::pair<std::string, cv::Mat>("", vis_to_out)))
{
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
vis_to_out_queue.push(std::pair<std::string, cv::Mat>("", vis_to_out));
}
else
{
while (!vis_to_out_queue.try_push(std::pair<std::string, cv::Mat>(media_filename, vis_to_out)))
{
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
vis_to_out_queue.push(std::pair<std::string, cv::Mat>(media_filename, vis_to_out));
}
// Clear the output
vis_to_out = cv::Mat();
}
@@ -535,16 +522,26 @@ RecorderOpenFace::~RecorderOpenFace()
void RecorderOpenFace::Close()
{
recording = false;
// Make sure the recording threads complete
writing_threads.wait();
if(!closed)
{
hog_recorder.Close();
csv_recorder.Close();
video_writer.release();
metadata_file.close();
// Insert terminating frames to the queues
vis_to_out_queue.push(std::pair<string, cv::Mat>("", cv::Mat()));
aligned_face_queue.push(std::pair<string, cv::Mat>("", cv::Mat()));
// Make sure the recording threads complete
if (video_writing_thread.joinable())
video_writing_thread.join();
if (aligned_writing_thread.joinable())
aligned_writing_thread.join();
hog_recorder.Close();
csv_recorder.Close();
video_writer.release();
metadata_file.close();
closed = true;
}
}

View File

@@ -269,7 +269,6 @@ bool SequenceCapture::OpenWebcam(int device, int image_width, int image_height,
start_time = cv::getTickCount();
capturing = true;
//capture_threads.run([&] {CaptureThread(); });
return true;