diff --git a/.gitignore b/.gitignore index 08b31459..35b7a77b 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ src/build src/dist src/*.egg-info *.swp +src/tests/pipeline/mtr-build +gg-iss-16 diff --git a/doc/source/complex.rst b/doc/source/complex.rst new file mode 100644 index 00000000..b6ac054b --- /dev/null +++ b/doc/source/complex.rst @@ -0,0 +1,23 @@ +.. _Complex: + +Multi-threaded complex processing pipelines +=========================================== + +The example below shows how GIFT-Grab can be used for running complex pipelines with multiple +intermediate processing nodes and threads. +The intermediate processing nodes in this example are built on the same principles as in the +:ref:`SciPy` example. +Running the example requires an `HEVC-encoded MP4 file`_, an `NVENC-capable GPU`_, and `NumPy support`_. + +.. _`HEVC-encoded MP4 file`: https://github.com/gift-surg/GIFT-Grab/blob/master/doc/build.md#reading-video-files +.. _`NVENC-capable GPU`: https://github.com/gift-surg/GIFT-Grab/blob/master/doc/build.md#hevc +.. _`NumPy support`: https://github.com/gift-surg/GIFT-Grab/blob/master/doc/build.md#python-api + +The full source code of the example is below. +Please follow the comments and the flow of code. +This example is also available on GitHub_: + +.. literalinclude:: ../../src/tests/pipeline/complex_pipeline.py + :language: python + +.. _GitHub: https://github.com/gift-surg/GIFT-Grab/blob/master/src/tests/pipeline/complex_pipeline.py diff --git a/doc/source/index.rst b/doc/source/index.rst index 2c8fddc2..8e9af514 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -51,6 +51,7 @@ Examples network scipy encoding + complex Citing GIFT-Grab ^^^^^^^^^^^^^^^^ diff --git a/src/api/broadcastdaemon.cpp b/src/api/broadcastdaemon.cpp index df7f7486..666655eb 100644 --- a/src/api/broadcastdaemon.cpp +++ b/src/api/broadcastdaemon.cpp @@ -10,6 +10,7 @@ namespace gg BroadcastDaemon::BroadcastDaemon(IVideoSource * source) : _source(source) , _running(false) + , _thread() { if (_source == nullptr) throw VideoSourceError("Null pointer passed" diff --git a/src/opencv/opencv_video_source.cpp b/src/opencv/opencv_video_source.cpp index d41ee42a..e4fe19d4 100644 --- a/src/opencv/opencv_video_source.cpp +++ b/src/opencv/opencv_video_source.cpp @@ -121,8 +121,24 @@ bool VideoSourceOpenCV::get_frame(gg::VideoFrame & frame) _buffer_bgra.cols != _buffer.cols) { _buffer_bgra = cv::Mat::zeros(_buffer.rows, _buffer.cols, CV_8UC4); } - // Convert to BGRA - cv::cvtColor(_buffer, _buffer_bgra, CV_BGR2BGRA); + /* This too broad try-catch block is a workaround to + * https://github.com/gift-surg/GIFT-Grab/issues/35 + * as OpenCV seems to intermittently throw an assertion + * exception in the form: + * "Can't fetch data from terminated TLS container". + * To make things worse, this exception is unhandled in + * the accompanying BroadcastDaemon thread, and so leads + * to an unspecific abortion of the whole application. + */ + try + { + // Convert to BGRA + cv::cvtColor(_buffer, _buffer_bgra, CV_BGR2BGRA); + } + catch (...) + { + return false; + } unsigned char * data = nullptr; size_t data_length = 0; size_t cols = 0, rows = 0; diff --git a/src/python/wrapper.cpp b/src/python/wrapper.cpp index ae1125de..11f7e72f 100644 --- a/src/python/wrapper.cpp +++ b/src/python/wrapper.cpp @@ -268,7 +268,8 @@ class IObserverWrapper : public gg::IObserver, public wrapper { gg::ScopedPythonGILLock gil_lock; VideoFrameNumPyWrapper wrapped_frame(&frame); - this->get_override("update")(boost::ref(wrapped_frame)); + if (override f = this->get_override("update")) + f(boost::ref(wrapped_frame)); } }; @@ -332,11 +333,13 @@ class IObservableObserverWrapper : public gg::IObservable void update(gg::VideoFrame & frame) { - if (override f = this->get_override("update")) { - VideoFrameNumPyWrapper wrapped_frame(&frame); gg::ScopedPythonGILLock gil_lock; - f(boost::ref(wrapped_frame)); + if (override f = this->get_override("update")) + { + VideoFrameNumPyWrapper wrapped_frame(&frame); + f(boost::ref(wrapped_frame)); + } } notify(frame); } diff --git a/src/tests/pipeline/complex_pipeline.py b/src/tests/pipeline/complex_pipeline.py new file mode 100755 index 00000000..864a2e9d --- /dev/null +++ b/src/tests/pipeline/complex_pipeline.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Example showing a complex GIFT-Grab pipeline with +multiple intermediate processing nodes. +""" + +from time import (sleep, time) +import argparse +import os.path +import threading +import numpy as np +import scipy.misc +from pygiftgrab import (VideoSourceFactory, VideoFrame, Device, + ColourSpace, IObservableObserver, + VideoTargetFactory, Codec, IObserver) + + +# global NumPy buffers +np_buffer_red, np_buffer_orig = None, None + +# mutex protecting frame data passed from node to node +# in the GIFT-Grab processing pipeline +lock = threading.Lock() + + +class SnapshotSaver(IObserver): + """A snapshot saver for saving incoming frames to PNG files.""" + + def __init__(self, root_dir, save_freq=5): + """ + Initialise a snapshot saver with a saving frequency. + + :param root_dir: the folder where to save the snapshots + :param save_freq: saving frequency. The default value tells + the saver to save every 5 seconds. + """ + super(SnapshotSaver, self).__init__() + assert 5 <= save_freq # to avoid flooding disk with images + self.save_freq = save_freq + self.root_dir = root_dir + self.last_saved = time() + self.num_saved = 0 + + def update(self, frame): + """Implement ``IObserver.update``.""" + if time() - self.last_saved >= self.save_freq: + self.num_saved += 1 + out_file = os.path.join(self.root_dir, + 'frame-{:010d}.png'.format(self.num_saved)) + scipy.misc.imsave(out_file, frame.data(True)) + self.last_saved = time() + + +class Bufferer(IObservableObserver): + """GIFT-Grab processing node that updates a frame buffer.""" + + def __init__(self, np_buffer): + """Initialise a bufferer that will update given buffer.""" + super(Bufferer, self).__init__() + self.buffer = np_buffer + + def update(self, frame): + """Implement ``IObservableObserver.update``.""" + with lock: + data = frame.data(True) + self.buffer[:, :, :] = data[:, :, :] + + +class Histogrammer(threading.Thread): + """ + GIFT-Grab processing node that computes the histogram of + an image channel and prints how "colored" that channel is. + """ + + channels = ('Blue', 'Green', 'Red', 'Alpha') + + def __init__(self, np_buffer, channel, tag, frame_rate, display_freq): + """ + :param np_buffer: image buffer to use + :param channel: image channel to compute coloredness for + :param tag: a tag describing what this image is, e.g. + how it's been processed within a GIFT-Grab pipeline + :param frame_rate: the rate at which to compute the + coloredness (unit: frames-per-second) + :param display_freq: how many times to skip the display + of computed coloredness, e.g. if 5 is provided, the + coloredness of every 5th frame will be printed to the + console + """ + super(Histogrammer, self).__init__() + assert channel in range(3) + assert 0 < frame_rate <= 60 + assert 0 <= display_freq + self.channel = channel + self.buffer = np_buffer + self.tag = tag + self.display_freq = display_freq + self.num_skipped = 0 + self.sleep_interval = 1.0 / frame_rate + self.running = False + + def run(self): + """Override ``Thread.run``.""" + if self.running: + return + + histogram, num_bins = None, 10 + scale = np.array([i for i in range(1, num_bins + 1)], np.float) + self.running = True + while self.running: + with lock: + histogram, _ = np.histogram( + self.buffer[:, :, 2], bins=num_bins, range=(0, 256) + ) + if histogram is not None: + coloredness = np.sum(histogram * scale) + coloredness /= np.sum(histogram) + coloredness /= num_bins + if self.num_skipped >= self.display_freq: + print('{}ness of {} image: {:.0%}'.format( + Histogrammer.channels[self.channel], + self.tag, coloredness + )) + self.num_skipped = 0 + else: + self.num_skipped += 1 + sleep(self.sleep_interval) + + def stop(self): + """Stop the thread.""" + self.running = False + + +class Dyer(IObservableObserver): + """Dyes specified channel of an incoming video frame.""" + + def __init__(self, channel, increment): + """ + :param channel: image channel to dye + :param increment: how much to dye the image channel + """ + super(Dyer, self).__init__() + assert 0 <= channel < 3 + assert 0 <= increment < 256 + self.channel = channel + self.increment = increment + + def update(self, frame): + """Implement ``IObservableObserver.update``.""" + with lock: + data = frame.data(True) + channel_data = data[:, :, self.channel] + channel_data[channel_data < 255 - self.increment] += self.increment + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('-i', '--input', type=str, required=True, + metavar='VIDEO_INPUT', + help='decklink (for grabbing frames from a Blackmagic DeckLink 4K Extreme 12G),\n' + 'dvi2pcie (for grabbing frames from an Epiphan DVI2PCIe Duo SDI port\n' + ' or a video file (HEVC-encoded MP4)') + args = parser.parse_args() + video_input = args.input + + sfac = VideoSourceFactory.get_instance() + + if video_input == 'decklink': + filename, ext = 'decklink', '.mp4' + + # start acquiring frames from a DeckLink 4K Extreme 12G + source = sfac.get_device(Device.DeckLink4KExtreme12G, ColourSpace.BGRA) + elif video_input == 'dvi2pcie': + filename, ext = 'dvi2pcie', '.mp4' + + # start acquirigin frames from an Epiphan DVI2PCIe Duo SDI port + source = sfac.get_device(Device.DVI2PCIeDuo_SDI, ColourSpace.BGRA) + else: + filename = os.path.basename(video_input) + filename, ext = os.path.splitext(filename) + assert filename + assert ext == '.mp4' + + # initialise reading of passed file + source = sfac.create_file_reader(video_input, ColourSpace.BGRA) + + frame = VideoFrame(ColourSpace.BGRA, False) + source.get_frame(frame) + frame_shape = (frame.rows(), frame.cols(), 4) + + # prepare for creating encoders (writers) + tfac = VideoTargetFactory.get_instance() + frame_rate = source.get_frame_rate() + + # create a red and green Dyer + red_dyer = Dyer(2, 128) + green_dyer = Dyer(1, 64) + + # create the bufferer for the red and green Dyers + np_buffer_red = np.zeros(frame_shape, np.uint8) + bufferer_red = Bufferer(np_buffer_red) + np_buffer_orig = np.zeros_like(np_buffer_red) + bufferer_orig = Bufferer(np_buffer_orig) + + # create the histogrammers for the red-dyed and + # the original video frames, and start them + hist_red = Histogrammer(np_buffer_red, 2, 'red-dyed', 60.0, 10) + hist_red.start() + hist_orig = Histogrammer(np_buffer_orig, 2, 'original', 50.0, 10) + hist_orig.start() + + # create encoders for the red-dyed and yellow-dyed (as green + # is applied on top of red) video streams + red_file = os.path.join('.', ''.join([filename, '-red', ext])) + red_writer = tfac.create_file_writer(Codec.HEVC, red_file, frame_rate) + yellow_file = os.path.join('.', ''.join([filename, '-yellow', ext])) + yellow_writer = tfac.create_file_writer(Codec.HEVC, yellow_file, frame_rate) + + # create a SnapshotSaver for saving a number of yellow-dyed + # video frames + yellow_snapshots = SnapshotSaver('.', 9) + + # assemble the GIFT-Grab pipeline + source.attach(bufferer_orig) + bufferer_orig.attach(red_dyer) + red_dyer.attach(red_writer) + red_dyer.attach(bufferer_red) + red_dyer.attach(green_dyer) + green_dyer.attach(yellow_writer) + green_dyer.attach(yellow_snapshots) + + sleep(20) # operate pipeline for 20 sec + + # stop the histogrammers + hist_red.stop() + hist_orig.stop() + + # disassemble the GIFT-Grab pipeline + source.detach(bufferer_orig) + bufferer_orig.detach(red_dyer) + red_dyer.detach(red_writer) + red_dyer.detach(bufferer_red) + red_dyer.detach(green_dyer) + green_dyer.detach(yellow_writer) + green_dyer.detach(yellow_snapshots) diff --git a/src/tests/pipeline/run-complex-pipeline.sh b/src/tests/pipeline/run-complex-pipeline.sh new file mode 100755 index 00000000..d9bd9406 --- /dev/null +++ b/src/tests/pipeline/run-complex-pipeline.sh @@ -0,0 +1,112 @@ +#!/usr/bin/env bash +# -*- coding: utf-8 -*- + +# This script provides a multi-threading reliability check. +# The background is issue #16. It looks like in applications +# where multiple Python threads are involved, occasionally +# the acquisition of the Global Interpreter Lock leads to a +# deadlock, which crashes the whole application with a +# non-specific segmentation fault. +# +# It builds a basic GIFT-Grab capable of reading a video file, +# and encoding to a video file in real time, with Python and +# NumPy support. It subsequently runs a multi-threaded +# GIFT-Grab pipeline a number of times, recording the exit +# status each time. This is essentially a stress-test that +# should serve as a validation that issue #16 is fixed. + +if [ $# -lt 1 ] || [ $# -gt 3 ]; +then + THIS_SCRIPT="$(basename "$(test -L "${BASH_SOURCE[0]}" && readlink "$0" || echo "$0")")" + printf "Usage: $THIS_SCRIPT video_file|decklink|dvi2pcie [ num_reps [ output_dir ] ]\n" + printf "\tvideo_file: path to an HEVC-encoded MP4 file\n" + printf "\tdecklink: Blackmagic DeckLink 4K Extreme 12G frame grabber\n" + printf "\tdvi2pcie: Epiphan DVI2PCIe Duo frame grabber\n" + printf "\tnum_reps: how many times to run the Python script " + printf "(default: once)\n" + printf "\toutput_dir: where to save all the generated output " + printf "(default: current directory)\n" + exit 1 +fi + +CALL_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +SOURCE_DIR="$( cd "$CALL_DIR/../.." >/dev/null && pwd )" +MTR_SCRIPT=$SOURCE_DIR/tests/pipeline/complex_pipeline.py +if [ $# -ge 3 ]; +then + ROOT_DIR="$( cd "$3" >/dev/null && pwd )" +else + ROOT_DIR=$CALL_DIR +fi + +BUILD_DIR=$CALL_DIR/mtr-build +CMAKE_OPTS="-D USE_FILES=ON" +CMAKE_OPTS="$CMAKE_OPTS -D USE_HEVC=ON" +CMAKE_OPTS="$CMAKE_OPTS -D ENABLE_NONFREE=ON -D USE_NVENC=ON" +CMAKE_OPTS="$CMAKE_OPTS -D BUILD_PYTHON=ON -D USE_NUMPY=ON" +if [ "$1" = "decklink" ]; then + if [[ -z "${BlackmagicSDK_DIR}" ]]; then + echo "Please set BlackmagicSDK_DIR to the path of the Blackmagic SDK" + exit 1 + fi + CMAKE_OPTS="$CMAKE_OPTS -D USE_BLACKMAGIC_DECKLINK_4K_EXTREME_12G=ON -D ENABLE_NONFREE=ON" +elif [ "$1" = "dvi2pcie" ]; then + CMAKE_OPTS="$CMAKE_OPTS -D USE_EPIPHAN_DVI2PCIE_DUO=ON -D USE_I420=OFF" +fi +CMAKE_OPTS="$CMAKE_OPTS -D CMAKE_BUILD_TYPE=Debug" +SESSION_DIR=$ROOT_DIR/$(date +"%Y-%m-%d-%H-%M-%S") +mkdir $SESSION_DIR +echo "Session directory: $SESSION_DIR" +ulimit -c unlimited + +BUILD_LOG=$SESSION_DIR/build.log +{ + git describe --dirty + mkdir -p $BUILD_DIR + rm -rf $BUILD_DIR/* + cd $BUILD_DIR + cmake $CMAKE_OPTS $SOURCE_DIR + make -j4 +} > $BUILD_LOG 2>&1 + +num_reps=1 +if [ $# -ge 2 ]; +then + num_reps=$2 +fi + +declare -a exit_code_freqs +for run_no in `seq 1 $num_reps`; +do + WORKING_DIR=$SESSION_DIR/$(printf "%03d" $run_no) + mkdir $WORKING_DIR + cd $WORKING_DIR + RUN_LOG=$WORKING_DIR/run.log + { + PYTHONPATH=$BUILD_DIR python $MTR_SCRIPT --input $1 + + exit_code=$? + echo "Exit code was: $exit_code" + freq=${exit_code_freqs[$exit_code]} + let freq=$freq+1 + exit_code_freqs[$exit_code]=$freq + } > $RUN_LOG 2>&1 + + if [ $(($run_no % 10)) -eq 0 ] || [ $run_no -eq $num_reps ]; + then + EXIT_CODES_LOG=$SESSION_DIR/exit-codes-$run_no.csv + printf "%10s" "exit-code" >> $EXIT_CODES_LOG + printf "%10s" "frequency" >> $EXIT_CODES_LOG + printf "\n" >> $EXIT_CODES_LOG + for exit_code in "${!exit_code_freqs[@]}"; + do + printf "%10d" $exit_code >> $EXIT_CODES_LOG + printf "%10d" ${exit_code_freqs[$exit_code]} >> $EXIT_CODES_LOG + printf "\n" >> $EXIT_CODES_LOG + done + fi + + sleep 5 +done + +ulimit -c 0