my pipeline code:
//
// Created by vapd on 19-3-19.
//
#include "vapd_pipeline_1.h"
#include <gst/gst.h>
#include <glib.h>
#include <math.h>
#include <string.h>
#include <sys/time.h>
#include <string>
#include "gstnvdsmeta.h"
#include "gstnvstreammeta.h"
#include "gst-nvmessage.h"
#include "nvbuffer.h"
#include <opencv2/opencv.hpp>
#include "logger.h"
/* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set
* based on the fastest source's framerate. */
#define MUXER_BATCH_TIMEOUT_USEC 10000
/* NVIDIA Decoder source pad memory feature. This feature signifies that source
* pads having this capability will push GstBuffers containing cuda buffers. */
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"
#define NVGSTDS_LINK_ELEMENT(elem1, elem2) \
do { \
if (!gst_element_link (elem1,elem2)) { \
GstCaps *src_caps, *sink_caps; \
src_caps = gst_pad_query_caps ((GstPad *) (elem1)->srcpads->data, NULL); \
sink_caps = gst_pad_query_caps ((GstPad *) (elem2)->sinkpads->data, NULL); \
LOG_ERROR(logger,fmtString("Failed to link '%s' (%s) and '%s' (%s)\n", \
GST_ELEMENT_NAME (elem1), \
gst_caps_to_string (src_caps), \
GST_ELEMENT_NAME (elem2), \
gst_caps_to_string (sink_caps))); \
} \
} while (0)
#define FPS_PRINT_INTERVAL 300
static gboolean bus_call (GstBus * bus, GstMessage * msg, gpointer data)
{
VapdPipeline *vapdPipeline = (VapdPipeline *) data;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
LOG_INFO(logger,"End of stream\n");
g_main_loop_quit (vapdPipeline->loop);
break;
case GST_MESSAGE_WARNING:
{
gchar *debug;
GError *error;
gst_message_parse_warning (msg, &error, &debug);
LOG_ERROR(logger,fmtString("WARNING from element %s: %s\n",
GST_OBJECT_NAME (msg->src), error->message));
g_free (debug);
LOG_ERROR(logger,fmtString("Warning: %s\n", error->message));
g_error_free (error);
break;
}
case GST_MESSAGE_ERROR:
{
gchar *debug;
GError *error;
gst_message_parse_error (msg, &error, &debug);
LOG_ERROR(logger,fmtString("ERROR from element {}: {}\n",
GST_OBJECT_NAME (msg->src), error->message));
if (debug)
LOG_ERROR(logger,"VapdPipeline["<< vapdPipeline->pipeline_index << "]Error details: "<< debug << "\n");
g_free (debug);
g_error_free (error);
g_main_loop_quit (vapdPipeline->loop);
break;
}
case GST_MESSAGE_ELEMENT:
{
if (gst_nvmessage_is_stream_eos (msg)) {
guint stream_id;
if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) {
LOG_ERROR(logger,fmtString("Got EOS from stream {}\n", stream_id));
}
}
break;
}
default:
break;
}
return TRUE;
}
static void cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
{
GstCaps *caps = gst_pad_query_caps (decoder_src_pad, NULL);
const GstStructure *str = gst_caps_get_structure (caps, 0);
const gchar *name = gst_structure_get_name (str);
GstElement *source_bin = (GstElement *) data;
GstCapsFeatures *features = gst_caps_get_features (caps, 0);
/* Need to check if the pad created by the decodebin is for video and not
* audio. */
if (!strncmp (name, "video", 5)) {
/* Link the decodebin pad only if decodebin has picked nvidia
* decoder plugin nvdec_*. We do this by checking if the pad caps contain
* NVMM memory features. */
if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
/* Get the source bin ghost pad */
GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, "src");
if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
decoder_src_pad)) {
LOG_ERROR(logger,"Failed to link decoder src pad to source bin ghost pad\n");
}
gst_object_unref (bin_ghost_pad);
} else {
LOG_ERROR(logger,"Error: Decodebin did not pick nvidia decoder plugin.\n");
}
}
}
static GstPadProbeReturn decode_pad_buffer_probe (GstPad * pad, GstPadProbeInfo * probe_info,
gpointer u_data) {
GstBuffer *gstbuf = (GstBuffer *)probe_info->data;
GstNvStreamMeta *streammeta = NULL;
streammeta = gst_buffer_get_nvstream_meta (gstbuf);
VapdPipeline *dpsPipeline = (VapdPipeline *)u_data;
int i = 0;
int channelIndex;
for(auto pipeline_channel : dpsPipeline->pipelineInfo.pipeline_channel)
{
if(i == *streammeta->stream_id)
{
channelIndex = pipeline_channel.first;
}
i++;
}
printf("ret:[%d][%d],channel:%d\n",dpsPipeline->pipeline_index,*streammeta->stream_id,channelIndex);
return GST_PAD_PROBE_OK;
}
VapdPipeline::VapdPipeline(PipelineInfo pipelineInfo1,int pipeIndex,int gpu_id)
{
pipelineInfo = pipelineInfo1;
pipeline_index = pipeIndex;
pipeline_gpu_id = gpu_id;
}
GstElement *VapdPipeline::create_element_into_pipeline(std::string element_name,int index)
{
gchar name[20] = { };
g_snprintf (name, 20, "vapd-%s-%02d", element_name.c_str(),index);
GstElement *element = gst_element_factory_make(element_name.c_str(),name);
if (!element) {
LOG_ERROR (logger,"One element could not be created. Exiting.\n");
return nullptr;
}
gst_bin_add(GST_BIN (pipeline), element);
return element;
}
int VapdPipeline::init()
{
gchar name[20] = { };
/* Standard GStreamer initialization */
loop = g_main_loop_new (nullptr, FALSE);
/* Create gstreamer elements */
/* Create Pipeline element that will form a connection of other elements */
g_snprintf (name, 20, "vapd-pipeline-%02d", pipeline_index);
pipeline = gst_pipeline_new (name);
/* Create nvstreammux instance to form batches from one or more sources. */
streammux = create_element_into_pipeline ("nvstreammux", pipeline_index);
streamdemux = create_element_into_pipeline ("nvstreamdemux", pipeline_index);
nvvidconv = create_element_into_pipeline ("nvvidconv", pipeline_index);
//g_object_set (G_OBJECT (nvvidconv), "gpu-id",pipeline_gpu_id,nullptr);
//g_object_set (G_OBJECT (nvvidconv), "qos",true,nullptr);
link_streammux();
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, bus_call, this);
// todo deletePipeline gst_object_unref (bus);
g_object_set (G_OBJECT (streammux), "width", pipelineInfo.video_width, "height",
pipelineInfo.video_height, "batch-size", pipelineInfo.pipeline_channel.size()*2,
"batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC,nullptr);//"gpu-id",pipeline_gpu_id
filter1 = gst_element_factory_make ("capsfilter", "filter1");
filter2 = gst_element_factory_make ("capsfilter", "filter2");
caps1 = gst_caps_from_string ("video/x-raw(memory:NVMM), format=NV12");
g_object_set (G_OBJECT (filter1), "caps", caps1, nullptr);
gst_caps_unref (caps1);
caps2 = gst_caps_from_string ("video/x-raw(memory:NVMM), format=RGBA");
g_object_set (G_OBJECT (filter2), "caps", caps2, nullptr);
gst_caps_unref (caps2);
/* Set up the pipeline */
/* we add all elements into the pipeline */
gst_bin_add_many (GST_BIN (pipeline),filter1, filter2, nullptr);
link_streamdemux();
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd -> video-renderer */
if (!gst_element_link_many (streammux,filter1, nvvidconv, filter2,streamdemux,
NULL)) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Elements could not be linked. Exiting.\n");
return -1;
}
return 1;
}
gboolean
link_element_to_demux_src_pad (GstElement *demux, GstElement *elem, guint index)
{
gboolean ret = FALSE;
GstPad *demux_src_pad = NULL;
GstPad *sink_pad = NULL;
gchar pad_name[16];
g_snprintf (pad_name, 16, "src_%u", index);
pad_name[15] = '
//
// Created by vapd on 19-3-19.
//
#include “vapd_pipeline_1.h”
#include <gst/gst.h>
#include <glib.h>
#include <math.h>
#include <string.h>
#include <sys/time.h>
#include
#include “gstnvdsmeta.h”
#include “gstnvstreammeta.h”
#include “gst-nvmessage.h”
#include “nvbuffer.h”
#include <opencv2/opencv.hpp>
#include “logger.h”
/* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set
- based on the fastest source’s framerate. */
#define MUXER_BATCH_TIMEOUT_USEC 10000
/* NVIDIA Decoder source pad memory feature. This feature signifies that source
- pads having this capability will push GstBuffers containing cuda buffers. */
#define GST_CAPS_FEATURES_NVMM “memory:NVMM”
#define NVGSTDS_LINK_ELEMENT(elem1, elem2)
do {
if (!gst_element_link (elem1,elem2)) {
GstCaps *src_caps, *sink_caps;
src_caps = gst_pad_query_caps ((GstPad *) (elem1)->srcpads->data, NULL);
sink_caps = gst_pad_query_caps ((GstPad *) (elem2)->sinkpads->data, NULL);
LOG_ERROR(logger,fmtString(“Failed to link ‘%s’ (%s) and ‘%s’ (%s)\n”,
GST_ELEMENT_NAME (elem1),
gst_caps_to_string (src_caps),
GST_ELEMENT_NAME (elem2),
gst_caps_to_string (sink_caps)));
}
} while (0)
#define FPS_PRINT_INTERVAL 300
static gboolean bus_call (GstBus * bus, GstMessage * msg, gpointer data)
{
VapdPipeline *vapdPipeline = (VapdPipeline *) data;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
LOG_INFO(logger,“End of stream\n”);
g_main_loop_quit (vapdPipeline->loop);
break;
case GST_MESSAGE_WARNING:
{
gchar *debug;
GError *error;
gst_message_parse_warning (msg, &error, &debug);
LOG_ERROR(logger,fmtString(“WARNING from element %s: %s\n”,
GST_OBJECT_NAME (msg->src), error->message));
g_free (debug);
LOG_ERROR(logger,fmtString(“Warning: %s\n”, error->message));
g_error_free (error);
break;
}
case GST_MESSAGE_ERROR:
{
gchar *debug;
GError *error;
gst_message_parse_error (msg, &error, &debug);
LOG_ERROR(logger,fmtString(“ERROR from element {}: {}\n”,
GST_OBJECT_NAME (msg->src), error->message));
if (debug)
LOG_ERROR(logger,“VapdPipeline[”<< vapdPipeline->pipeline_index << "]Error details: "<< debug << “\n”);
g_free (debug);
g_error_free (error);
g_main_loop_quit (vapdPipeline->loop);
break;
}
case GST_MESSAGE_ELEMENT:
{
if (gst_nvmessage_is_stream_eos (msg)) {
guint stream_id;
if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) {
LOG_ERROR(logger,fmtString(“Got EOS from stream {}\n”, stream_id));
}
}
break;
}
default:
break;
}
return TRUE;
}
static void cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
{
GstCaps *caps = gst_pad_query_caps (decoder_src_pad, NULL);
const GstStructure *str = gst_caps_get_structure (caps, 0);
const gchar *name = gst_structure_get_name (str);
GstElement *source_bin = (GstElement *) data;
GstCapsFeatures *features = gst_caps_get_features (caps, 0);
/* Need to check if the pad created by the decodebin is for video and not
- audio. /
if (!strncmp (name, “video”, 5)) {
/ Link the decodebin pad only if decodebin has picked nvidia
- decoder plugin nvdec_*. We do this by checking if the pad caps contain
- NVMM memory features. /
if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
/ Get the source bin ghost pad */
GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, “src”);
if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
decoder_src_pad)) {
LOG_ERROR(logger,“Failed to link decoder src pad to source bin ghost pad\n”);
}
gst_object_unref (bin_ghost_pad);
} else {
LOG_ERROR(logger,“Error: Decodebin did not pick nvidia decoder plugin.\n”);
}
}
}
static GstPadProbeReturn decode_pad_buffer_probe (GstPad * pad, GstPadProbeInfo * probe_info,
gpointer u_data) {
GstBuffer *gstbuf = (GstBuffer *)probe_info->data;
GstNvStreamMeta *streammeta = NULL;
streammeta = gst_buffer_get_nvstream_meta (gstbuf);
VapdPipeline *dpsPipeline = (VapdPipeline *)u_data;
int i = 0;
int channelIndex;
for(auto pipeline_channel : dpsPipeline->pipelineInfo.pipeline_channel)
{
if(i == *streammeta->stream_id)
{
channelIndex = pipeline_channel.first;
}
i++;
}
printf("ret:[%d][%d],channel:%d\n",dpsPipeline->pipeline_index,*streammeta->stream_id,channelIndex);
return GST_PAD_PROBE_OK;
}
VapdPipeline::VapdPipeline(PipelineInfo pipelineInfo1,int pipeIndex,int gpu_id)
{
pipelineInfo = pipelineInfo1;
pipeline_index = pipeIndex;
pipeline_gpu_id = gpu_id;
}
GstElement *VapdPipeline::create_element_into_pipeline(std::string element_name,int index)
{
gchar name[20] = { };
g_snprintf (name, 20, “vapd-%s-%02d”, element_name.c_str(),index);
GstElement *element = gst_element_factory_make(element_name.c_str(),name);
if (!element) {
LOG_ERROR (logger,“One element could not be created. Exiting.\n”);
return nullptr;
}
gst_bin_add(GST_BIN (pipeline), element);
return element;
}
int VapdPipeline::init()
{
gchar name[20] = { };
/* Standard GStreamer initialization */
loop = g_main_loop_new (nullptr, FALSE);
/* Create gstreamer elements /
/ Create Pipeline element that will form a connection of other elements */
g_snprintf (name, 20, “vapd-pipeline-%02d”, pipeline_index);
pipeline = gst_pipeline_new (name);
/* Create nvstreammux instance to form batches from one or more sources. */
streammux = create_element_into_pipeline (“nvstreammux”, pipeline_index);
streamdemux = create_element_into_pipeline (“nvstreamdemux”, pipeline_index);
nvvidconv = create_element_into_pipeline (“nvvidconv”, pipeline_index);
//g_object_set (G_OBJECT (nvvidconv), “gpu-id”,pipeline_gpu_id,nullptr);
//g_object_set (G_OBJECT (nvvidconv), “qos”,true,nullptr);
link_streammux();
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, bus_call, this);
// todo deletePipeline gst_object_unref (bus);
g_object_set (G_OBJECT (streammux), “width”, pipelineInfo.video_width, “height”,
pipelineInfo.video_height, “batch-size”, pipelineInfo.pipeline_channel.size()*2,
“batched-push-timeout”, MUXER_BATCH_TIMEOUT_USEC,nullptr);//“gpu-id”,pipeline_gpu_id
filter1 = gst_element_factory_make (“capsfilter”, “filter1”);
filter2 = gst_element_factory_make (“capsfilter”, “filter2”);
caps1 = gst_caps_from_string (“video/x-raw(memory:NVMM), format=NV12”);
g_object_set (G_OBJECT (filter1), “caps”, caps1, nullptr);
gst_caps_unref (caps1);
caps2 = gst_caps_from_string (“video/x-raw(memory:NVMM), format=RGBA”);
g_object_set (G_OBJECT (filter2), “caps”, caps2, nullptr);
gst_caps_unref (caps2);
/* Set up the pipeline /
/ we add all elements into the pipeline */
gst_bin_add_many (GST_BIN (pipeline),filter1, filter2, nullptr);
link_streamdemux();
/* we link the elements together
- nvstreammux → nvinfer → nvtiler → nvvidconv → nvosd → video-renderer */
if (!gst_element_link_many (streammux,filter1, nvvidconv, filter2,streamdemux,
NULL)) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]Elements could not be linked. Exiting.\n”);
return -1;
}
return 1;
}
gboolean
link_element_to_demux_src_pad (GstElement *demux, GstElement *elem, guint index)
{
gboolean ret = FALSE;
GstPad *demux_src_pad = NULL;
GstPad *sink_pad = NULL;
gchar pad_name[16];
g_snprintf (pad_name, 16, “src_%u”, index);
pad_name[15] = ‘\0’;
demux_src_pad = gst_element_get_request_pad (demux, pad_name);
if (!demux_src_pad) {
LOG_INFO(logger,“Failed to get sink pad from demux\n”);
goto done;
}
sink_pad = gst_element_get_static_pad (elem, “sink”);
if (!sink_pad) {
LOG_INFO(logger,“Failed to get src pad from '”<< GST_ELEMENT_NAME (elem)<<“\n”);
goto done;
}
if (gst_pad_link (demux_src_pad, sink_pad) != GST_PAD_LINK_OK) {
LOG_INFO(logger,fmtString(“Failed to link ‘{}’ and ‘{}’\n”, GST_ELEMENT_NAME (demux),
GST_ELEMENT_NAME (elem)));
goto done;
}
ret = TRUE;
done:
if (demux_src_pad) {
gst_object_unref (demux_src_pad);
}
if (sink_pad) {
gst_object_unref (sink_pad);
}
return ret;
}
gboolean link_element_to_streammux_sink_pad (GstElement *streammux, GstElement *elem, gint index)
{
gboolean ret = FALSE;
GstPad *mux_sink_pad = NULL;
GstPad *src_pad = NULL;
gchar pad_name[16];
if (index >= 0) {
g_snprintf (pad_name, 16, “sink_%u”, index);
pad_name[15] = ‘\0’;
} else {
strcpy (pad_name, “sink_%u”);
}
mux_sink_pad = gst_element_get_request_pad (streammux, pad_name);
if (!mux_sink_pad) {
LOG_INFO(logger,“Failed to get sink pad from streammux\n”);
goto done;
}
src_pad = gst_element_get_static_pad (elem, “src”);
if (!src_pad) {
LOG_INFO(logger,“Failed to get src pad from '”<< GST_ELEMENT_NAME (elem)<<“\n”);
goto done;
}
if (gst_pad_link (src_pad, mux_sink_pad) != GST_PAD_LINK_OK) {
LOG_INFO(logger,fmtString(“Failed to link ‘{}’ and ‘{}’\n”, GST_ELEMENT_NAME (streammux),
GST_ELEMENT_NAME (elem)));
goto done;
}
ret = TRUE;
done:
if (mux_sink_pad) {
gst_object_unref (mux_sink_pad);
}
if (src_pad) {
gst_object_unref (src_pad);
}
return ret;
}
GstElement *VapdPipeline::create_source_bin_1 (guint index, gchar * uri)
{
GstElement *bin = nullptr,*decodebin= nullptr,*source = nullptr;
gchar bin_name[16] = { };
gchar source_name[16] = { };
g_snprintf (bin_name, 15, “source-bin-%02d”, index);
g_snprintf (source_name, 15, “h264-source-%02d”, index);
/* Create a source GstBin to abstract this bin’s content from the rest of the
- pipeline */
bin = gst_bin_new (bin_name);
source = gst_element_factory_make (“appsrc”, source_name);
//g_object_set (G_OBJECT (source), “max-bytes”,0,“format”,4,“is-live”,true,nullptr);
decodebin = gst_element_factory_make(“decodebin”,“decode_bin”);
g_object_set (G_OBJECT (decodebin), “async-handling”,true,nullptr);
source_vector.push_back(source);
int i = 0;
for(auto pipeline_channel : pipelineInfo.pipeline_channel)
{
if(i == index)
{
VapdChannel *channel = pipeline_channel.second;
channel->setAppSourceElement(source_vector[index]);
break;
}
i++;
}
if (!bin || !decodebin || !source_vector[index]) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]One element in source bin could not be created.\n”);
return NULL;
}
g_signal_connect (G_OBJECT (decodebin), “pad-added”,
G_CALLBACK (cb_newpad), bin);
gst_bin_add_many (GST_BIN (bin), source_vector[index],decodebin,NULL);
NVGSTDS_LINK_ELEMENT (source_vector[index], decodebin);
/* We need to create a ghost pad for the source bin which will act as a proxy
- for the video decoder src pad. The ghost pad will not have a target right
- now. Once the decode bin creates the video decoder and generates the
- cb_newpad callback, we will set the ghost pad target to the video decoder
- src pad. */
if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target (“src”,
GST_PAD_SRC))) {
g_printerr (“Failed to add ghost pad in source bin\n”);
return NULL;
}
return bin;
}
GstElement *VapdPipeline::create_sink_bin (int index)
{
GstElement *bin = nullptr;
gchar bin_name[16] = { };
gchar source_name[16] = { };
g_snprintf (bin_name, 15, “source-bin-%02d”, index);
g_snprintf (source_name, 15, “h264-source-%02d”, index);
/* Create a source GstBin to abstract this bin’s content from the rest of the
- pipeline */
bin = gst_bin_new (bin_name);
//GstElement *queue = gst_element_factory_make (“queue”, “queue”);
GstElement *fakesink = gst_element_factory_make (“fakesink”, “fakesink”);
if (!bin || !fakesink) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]One element in source bin could not be created.\n”);
return NULL;
}
gst_bin_add_many (GST_BIN (bin),fakesink,nullptr);
gst_element_link_many(fakesink,nullptr);
GstPad *pad = gst_element_get_static_pad (fakesink, “sink”);
//GstPad *sinkpad = gst_element_get_static_pad (fakesink, “sink”);
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
decode_pad_buffer_probe, this, NULL);
GstPad *bin_ghost_pad = gst_ghost_pad_new(“sink”, pad);
gst_pad_set_active (bin_ghost_pad, TRUE);
gst_element_add_pad (bin, bin_ghost_pad);
gst_object_unref (pad);
return bin;
}
GstElement *VapdPipeline::create_sink_queue (int index)
{
gchar fakesink_name[16] = { };
g_snprintf (fakesink_name, 15, “src_%u”, index);
gchar queue_name[16] = { };
g_snprintf (queue_name, 15, “queue_%u”, index);
GstElement *queue = gst_element_factory_make("queue",queue_name);
GstElement *fakesink = gst_element_factory_make("appsink",fakesink_name);
if (!fakesink || !queue) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streamdemux. Exiting.\n");
return nullptr;
}
//g_object_set (G_OBJECT (queue), "max-size-buffers",20000000000,nullptr);
//g_object_set (G_OBJECT (fakesink), "async",false,"sync",false, "num-buffers",4096000000,nullptr);
g_object_set (G_OBJECT (fakesink), "async",false,nullptr);
gst_bin_add_many (GST_BIN (pipeline), queue,fakesink, nullptr);
NVGSTDS_LINK_ELEMENT(queue,fakesink);
GstPad *fakesinkpad = gst_element_get_static_pad (fakesink, "sink");
if (!fakesinkpad) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]fakesinkpad get sink pad failed. Exiting.\n");
return nullptr;
}
else
{
gst_pad_add_probe (fakesinkpad, GST_PAD_PROBE_TYPE_BUFFER,
decode_pad_buffer_probe, this, NULL);
}
gst_object_unref (fakesinkpad);
return queue;
}
int VapdPipeline::link_streammux()
{
unsigned long size = pipelineInfo.pipeline_channel.size();
source_vector.reserve(size);
for (int i = 0; i < size; i++) {
//----------- for streammux ------------------
GstElement *source_bin = create_source_bin_1 (i, nullptr);
if (!source_bin) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]Failed to create source bin. Exiting.\n”);
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
if(!link_element_to_streammux_sink_pad(streammux,source_bin,i))
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streammux. Exiting.\n");
return -1;
}
//---------------------------------------------
}
}
int VapdPipeline::link_streamdemux()
{
unsigned long size = pipelineInfo.pipeline_channel.size();
for (int i = 0; i < size; i++) {
//----------- for streamdemux ------------------
GstElement *sink_queue = create_sink_queue (i);
//GstElement *sink_queue = create_sink_bin(i);
if(!sink_queue)
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to create sink queue. Exiting.\n");
return -1;
}
if(!link_element_to_demux_src_pad(streamdemux,sink_queue,i))
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streamdemux. Exiting.\n");
return -1;
}
}
}
int VapdPipeline::startPipeline()
{
/* Set the pipeline to “playing” state */
GstStateChangeReturn ret2 = gst_element_set_state (pipeline, GST_STATE_PLAYING);
/* Wait till pipeline encounters an error or EOS */
LOG_INFO(logger,“VapdPipeline[”<< pipeline_index << “]is running…\n”);
g_main_loop_run (loop);
LOG_INFO(logger,“pipeline[”<< pipeline_index << “]Returned, stopping playback,Deleting pipeline\n”);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}
VapdPipeline::~VapdPipeline()
{
destoryPipeline();
delete thread_pipeline;
}
int VapdPipeline::destoryPipeline()
{
/* Out of the main loop, clean up nicely */
LOG_INFO(logger,“pipeline[”<< pipeline_index << “]Returned, stopping playback,Deleting pipeline\n”);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}
';
demux_src_pad = gst_element_get_request_pad (demux, pad_name);
if (!demux_src_pad) {
LOG_INFO(logger,"Failed to get sink pad from demux\n");
goto done;
}
sink_pad = gst_element_get_static_pad (elem, "sink");
if (!sink_pad) {
LOG_INFO(logger,"Failed to get src pad from '"<< GST_ELEMENT_NAME (elem)<<"\n");
goto done;
}
if (gst_pad_link (demux_src_pad, sink_pad) != GST_PAD_LINK_OK) {
LOG_INFO(logger,fmtString("Failed to link '{}' and '{}'\n", GST_ELEMENT_NAME (demux),
GST_ELEMENT_NAME (elem)));
goto done;
}
ret = TRUE;
done:
if (demux_src_pad) {
gst_object_unref (demux_src_pad);
}
if (sink_pad) {
gst_object_unref (sink_pad);
}
return ret;
}
gboolean link_element_to_streammux_sink_pad (GstElement *streammux, GstElement *elem, gint index)
{
gboolean ret = FALSE;
GstPad *mux_sink_pad = NULL;
GstPad *src_pad = NULL;
gchar pad_name[16];
if (index >= 0) {
g_snprintf (pad_name, 16, "sink_%u", index);
pad_name[15] = '
//
// Created by vapd on 19-3-19.
//
#include “vapd_pipeline_1.h”
#include <gst/gst.h>
#include <glib.h>
#include <math.h>
#include <string.h>
#include <sys/time.h>
#include
#include “gstnvdsmeta.h”
#include “gstnvstreammeta.h”
#include “gst-nvmessage.h”
#include “nvbuffer.h”
#include <opencv2/opencv.hpp>
#include “logger.h”
/* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set
- based on the fastest source’s framerate. */
#define MUXER_BATCH_TIMEOUT_USEC 10000
/* NVIDIA Decoder source pad memory feature. This feature signifies that source
- pads having this capability will push GstBuffers containing cuda buffers. */
#define GST_CAPS_FEATURES_NVMM “memory:NVMM”
#define NVGSTDS_LINK_ELEMENT(elem1, elem2)
do {
if (!gst_element_link (elem1,elem2)) {
GstCaps *src_caps, *sink_caps;
src_caps = gst_pad_query_caps ((GstPad *) (elem1)->srcpads->data, NULL);
sink_caps = gst_pad_query_caps ((GstPad *) (elem2)->sinkpads->data, NULL);
LOG_ERROR(logger,fmtString(“Failed to link ‘%s’ (%s) and ‘%s’ (%s)\n”,
GST_ELEMENT_NAME (elem1),
gst_caps_to_string (src_caps),
GST_ELEMENT_NAME (elem2),
gst_caps_to_string (sink_caps)));
}
} while (0)
#define FPS_PRINT_INTERVAL 300
static gboolean bus_call (GstBus * bus, GstMessage * msg, gpointer data)
{
VapdPipeline *vapdPipeline = (VapdPipeline *) data;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
LOG_INFO(logger,“End of stream\n”);
g_main_loop_quit (vapdPipeline->loop);
break;
case GST_MESSAGE_WARNING:
{
gchar *debug;
GError *error;
gst_message_parse_warning (msg, &error, &debug);
LOG_ERROR(logger,fmtString(“WARNING from element %s: %s\n”,
GST_OBJECT_NAME (msg->src), error->message));
g_free (debug);
LOG_ERROR(logger,fmtString(“Warning: %s\n”, error->message));
g_error_free (error);
break;
}
case GST_MESSAGE_ERROR:
{
gchar *debug;
GError *error;
gst_message_parse_error (msg, &error, &debug);
LOG_ERROR(logger,fmtString(“ERROR from element {}: {}\n”,
GST_OBJECT_NAME (msg->src), error->message));
if (debug)
LOG_ERROR(logger,“VapdPipeline[”<< vapdPipeline->pipeline_index << "]Error details: "<< debug << “\n”);
g_free (debug);
g_error_free (error);
g_main_loop_quit (vapdPipeline->loop);
break;
}
case GST_MESSAGE_ELEMENT:
{
if (gst_nvmessage_is_stream_eos (msg)) {
guint stream_id;
if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) {
LOG_ERROR(logger,fmtString(“Got EOS from stream {}\n”, stream_id));
}
}
break;
}
default:
break;
}
return TRUE;
}
static void cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
{
GstCaps *caps = gst_pad_query_caps (decoder_src_pad, NULL);
const GstStructure *str = gst_caps_get_structure (caps, 0);
const gchar *name = gst_structure_get_name (str);
GstElement *source_bin = (GstElement *) data;
GstCapsFeatures *features = gst_caps_get_features (caps, 0);
/* Need to check if the pad created by the decodebin is for video and not
- audio. /
if (!strncmp (name, “video”, 5)) {
/ Link the decodebin pad only if decodebin has picked nvidia
- decoder plugin nvdec_*. We do this by checking if the pad caps contain
- NVMM memory features. /
if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
/ Get the source bin ghost pad */
GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, “src”);
if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
decoder_src_pad)) {
LOG_ERROR(logger,“Failed to link decoder src pad to source bin ghost pad\n”);
}
gst_object_unref (bin_ghost_pad);
} else {
LOG_ERROR(logger,“Error: Decodebin did not pick nvidia decoder plugin.\n”);
}
}
}
static GstPadProbeReturn decode_pad_buffer_probe (GstPad * pad, GstPadProbeInfo * probe_info,
gpointer u_data) {
GstBuffer *gstbuf = (GstBuffer *)probe_info->data;
GstNvStreamMeta *streammeta = NULL;
streammeta = gst_buffer_get_nvstream_meta (gstbuf);
VapdPipeline *dpsPipeline = (VapdPipeline *)u_data;
int i = 0;
int channelIndex;
for(auto pipeline_channel : dpsPipeline->pipelineInfo.pipeline_channel)
{
if(i == *streammeta->stream_id)
{
channelIndex = pipeline_channel.first;
}
i++;
}
printf("ret:[%d][%d],channel:%d\n",dpsPipeline->pipeline_index,*streammeta->stream_id,channelIndex);
return GST_PAD_PROBE_OK;
}
VapdPipeline::VapdPipeline(PipelineInfo pipelineInfo1,int pipeIndex,int gpu_id)
{
pipelineInfo = pipelineInfo1;
pipeline_index = pipeIndex;
pipeline_gpu_id = gpu_id;
}
GstElement *VapdPipeline::create_element_into_pipeline(std::string element_name,int index)
{
gchar name[20] = { };
g_snprintf (name, 20, “vapd-%s-%02d”, element_name.c_str(),index);
GstElement *element = gst_element_factory_make(element_name.c_str(),name);
if (!element) {
LOG_ERROR (logger,“One element could not be created. Exiting.\n”);
return nullptr;
}
gst_bin_add(GST_BIN (pipeline), element);
return element;
}
int VapdPipeline::init()
{
gchar name[20] = { };
/* Standard GStreamer initialization */
loop = g_main_loop_new (nullptr, FALSE);
/* Create gstreamer elements /
/ Create Pipeline element that will form a connection of other elements */
g_snprintf (name, 20, “vapd-pipeline-%02d”, pipeline_index);
pipeline = gst_pipeline_new (name);
/* Create nvstreammux instance to form batches from one or more sources. */
streammux = create_element_into_pipeline (“nvstreammux”, pipeline_index);
streamdemux = create_element_into_pipeline (“nvstreamdemux”, pipeline_index);
nvvidconv = create_element_into_pipeline (“nvvidconv”, pipeline_index);
//g_object_set (G_OBJECT (nvvidconv), “gpu-id”,pipeline_gpu_id,nullptr);
//g_object_set (G_OBJECT (nvvidconv), “qos”,true,nullptr);
link_streammux();
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, bus_call, this);
// todo deletePipeline gst_object_unref (bus);
g_object_set (G_OBJECT (streammux), “width”, pipelineInfo.video_width, “height”,
pipelineInfo.video_height, “batch-size”, pipelineInfo.pipeline_channel.size()*2,
“batched-push-timeout”, MUXER_BATCH_TIMEOUT_USEC,nullptr);//“gpu-id”,pipeline_gpu_id
filter1 = gst_element_factory_make (“capsfilter”, “filter1”);
filter2 = gst_element_factory_make (“capsfilter”, “filter2”);
caps1 = gst_caps_from_string (“video/x-raw(memory:NVMM), format=NV12”);
g_object_set (G_OBJECT (filter1), “caps”, caps1, nullptr);
gst_caps_unref (caps1);
caps2 = gst_caps_from_string (“video/x-raw(memory:NVMM), format=RGBA”);
g_object_set (G_OBJECT (filter2), “caps”, caps2, nullptr);
gst_caps_unref (caps2);
/* Set up the pipeline /
/ we add all elements into the pipeline */
gst_bin_add_many (GST_BIN (pipeline),filter1, filter2, nullptr);
link_streamdemux();
/* we link the elements together
- nvstreammux → nvinfer → nvtiler → nvvidconv → nvosd → video-renderer */
if (!gst_element_link_many (streammux,filter1, nvvidconv, filter2,streamdemux,
NULL)) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]Elements could not be linked. Exiting.\n”);
return -1;
}
return 1;
}
gboolean
link_element_to_demux_src_pad (GstElement *demux, GstElement *elem, guint index)
{
gboolean ret = FALSE;
GstPad *demux_src_pad = NULL;
GstPad *sink_pad = NULL;
gchar pad_name[16];
g_snprintf (pad_name, 16, “src_%u”, index);
pad_name[15] = ‘\0’;
demux_src_pad = gst_element_get_request_pad (demux, pad_name);
if (!demux_src_pad) {
LOG_INFO(logger,“Failed to get sink pad from demux\n”);
goto done;
}
sink_pad = gst_element_get_static_pad (elem, “sink”);
if (!sink_pad) {
LOG_INFO(logger,“Failed to get src pad from '”<< GST_ELEMENT_NAME (elem)<<“\n”);
goto done;
}
if (gst_pad_link (demux_src_pad, sink_pad) != GST_PAD_LINK_OK) {
LOG_INFO(logger,fmtString(“Failed to link ‘{}’ and ‘{}’\n”, GST_ELEMENT_NAME (demux),
GST_ELEMENT_NAME (elem)));
goto done;
}
ret = TRUE;
done:
if (demux_src_pad) {
gst_object_unref (demux_src_pad);
}
if (sink_pad) {
gst_object_unref (sink_pad);
}
return ret;
}
gboolean link_element_to_streammux_sink_pad (GstElement *streammux, GstElement *elem, gint index)
{
gboolean ret = FALSE;
GstPad *mux_sink_pad = NULL;
GstPad *src_pad = NULL;
gchar pad_name[16];
if (index >= 0) {
g_snprintf (pad_name, 16, “sink_%u”, index);
pad_name[15] = ‘\0’;
} else {
strcpy (pad_name, “sink_%u”);
}
mux_sink_pad = gst_element_get_request_pad (streammux, pad_name);
if (!mux_sink_pad) {
LOG_INFO(logger,“Failed to get sink pad from streammux\n”);
goto done;
}
src_pad = gst_element_get_static_pad (elem, “src”);
if (!src_pad) {
LOG_INFO(logger,“Failed to get src pad from '”<< GST_ELEMENT_NAME (elem)<<“\n”);
goto done;
}
if (gst_pad_link (src_pad, mux_sink_pad) != GST_PAD_LINK_OK) {
LOG_INFO(logger,fmtString(“Failed to link ‘{}’ and ‘{}’\n”, GST_ELEMENT_NAME (streammux),
GST_ELEMENT_NAME (elem)));
goto done;
}
ret = TRUE;
done:
if (mux_sink_pad) {
gst_object_unref (mux_sink_pad);
}
if (src_pad) {
gst_object_unref (src_pad);
}
return ret;
}
GstElement *VapdPipeline::create_source_bin_1 (guint index, gchar * uri)
{
GstElement *bin = nullptr,*decodebin= nullptr,*source = nullptr;
gchar bin_name[16] = { };
gchar source_name[16] = { };
g_snprintf (bin_name, 15, “source-bin-%02d”, index);
g_snprintf (source_name, 15, “h264-source-%02d”, index);
/* Create a source GstBin to abstract this bin’s content from the rest of the
- pipeline */
bin = gst_bin_new (bin_name);
source = gst_element_factory_make (“appsrc”, source_name);
//g_object_set (G_OBJECT (source), “max-bytes”,0,“format”,4,“is-live”,true,nullptr);
decodebin = gst_element_factory_make(“decodebin”,“decode_bin”);
g_object_set (G_OBJECT (decodebin), “async-handling”,true,nullptr);
source_vector.push_back(source);
int i = 0;
for(auto pipeline_channel : pipelineInfo.pipeline_channel)
{
if(i == index)
{
VapdChannel *channel = pipeline_channel.second;
channel->setAppSourceElement(source_vector[index]);
break;
}
i++;
}
if (!bin || !decodebin || !source_vector[index]) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]One element in source bin could not be created.\n”);
return NULL;
}
g_signal_connect (G_OBJECT (decodebin), “pad-added”,
G_CALLBACK (cb_newpad), bin);
gst_bin_add_many (GST_BIN (bin), source_vector[index],decodebin,NULL);
NVGSTDS_LINK_ELEMENT (source_vector[index], decodebin);
/* We need to create a ghost pad for the source bin which will act as a proxy
- for the video decoder src pad. The ghost pad will not have a target right
- now. Once the decode bin creates the video decoder and generates the
- cb_newpad callback, we will set the ghost pad target to the video decoder
- src pad. */
if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target (“src”,
GST_PAD_SRC))) {
g_printerr (“Failed to add ghost pad in source bin\n”);
return NULL;
}
return bin;
}
GstElement *VapdPipeline::create_sink_bin (int index)
{
GstElement *bin = nullptr;
gchar bin_name[16] = { };
gchar source_name[16] = { };
g_snprintf (bin_name, 15, “source-bin-%02d”, index);
g_snprintf (source_name, 15, “h264-source-%02d”, index);
/* Create a source GstBin to abstract this bin’s content from the rest of the
- pipeline */
bin = gst_bin_new (bin_name);
//GstElement *queue = gst_element_factory_make (“queue”, “queue”);
GstElement *fakesink = gst_element_factory_make (“fakesink”, “fakesink”);
if (!bin || !fakesink) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]One element in source bin could not be created.\n”);
return NULL;
}
gst_bin_add_many (GST_BIN (bin),fakesink,nullptr);
gst_element_link_many(fakesink,nullptr);
GstPad *pad = gst_element_get_static_pad (fakesink, “sink”);
//GstPad *sinkpad = gst_element_get_static_pad (fakesink, “sink”);
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
decode_pad_buffer_probe, this, NULL);
GstPad *bin_ghost_pad = gst_ghost_pad_new(“sink”, pad);
gst_pad_set_active (bin_ghost_pad, TRUE);
gst_element_add_pad (bin, bin_ghost_pad);
gst_object_unref (pad);
return bin;
}
GstElement *VapdPipeline::create_sink_queue (int index)
{
gchar fakesink_name[16] = { };
g_snprintf (fakesink_name, 15, “src_%u”, index);
gchar queue_name[16] = { };
g_snprintf (queue_name, 15, “queue_%u”, index);
GstElement *queue = gst_element_factory_make("queue",queue_name);
GstElement *fakesink = gst_element_factory_make("appsink",fakesink_name);
if (!fakesink || !queue) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streamdemux. Exiting.\n");
return nullptr;
}
//g_object_set (G_OBJECT (queue), "max-size-buffers",20000000000,nullptr);
//g_object_set (G_OBJECT (fakesink), "async",false,"sync",false, "num-buffers",4096000000,nullptr);
g_object_set (G_OBJECT (fakesink), "async",false,nullptr);
gst_bin_add_many (GST_BIN (pipeline), queue,fakesink, nullptr);
NVGSTDS_LINK_ELEMENT(queue,fakesink);
GstPad *fakesinkpad = gst_element_get_static_pad (fakesink, "sink");
if (!fakesinkpad) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]fakesinkpad get sink pad failed. Exiting.\n");
return nullptr;
}
else
{
gst_pad_add_probe (fakesinkpad, GST_PAD_PROBE_TYPE_BUFFER,
decode_pad_buffer_probe, this, NULL);
}
gst_object_unref (fakesinkpad);
return queue;
}
int VapdPipeline::link_streammux()
{
unsigned long size = pipelineInfo.pipeline_channel.size();
source_vector.reserve(size);
for (int i = 0; i < size; i++) {
//----------- for streammux ------------------
GstElement *source_bin = create_source_bin_1 (i, nullptr);
if (!source_bin) {
LOG_ERROR(logger,“VapdPipeline[”<< pipeline_index << “]Failed to create source bin. Exiting.\n”);
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
if(!link_element_to_streammux_sink_pad(streammux,source_bin,i))
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streammux. Exiting.\n");
return -1;
}
//---------------------------------------------
}
}
int VapdPipeline::link_streamdemux()
{
unsigned long size = pipelineInfo.pipeline_channel.size();
for (int i = 0; i < size; i++) {
//----------- for streamdemux ------------------
GstElement *sink_queue = create_sink_queue (i);
//GstElement *sink_queue = create_sink_bin(i);
if(!sink_queue)
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to create sink queue. Exiting.\n");
return -1;
}
if(!link_element_to_demux_src_pad(streamdemux,sink_queue,i))
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streamdemux. Exiting.\n");
return -1;
}
}
}
int VapdPipeline::startPipeline()
{
/* Set the pipeline to “playing” state */
GstStateChangeReturn ret2 = gst_element_set_state (pipeline, GST_STATE_PLAYING);
/* Wait till pipeline encounters an error or EOS */
LOG_INFO(logger,“VapdPipeline[”<< pipeline_index << “]is running…\n”);
g_main_loop_run (loop);
LOG_INFO(logger,“pipeline[”<< pipeline_index << “]Returned, stopping playback,Deleting pipeline\n”);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}
VapdPipeline::~VapdPipeline()
{
destoryPipeline();
delete thread_pipeline;
}
int VapdPipeline::destoryPipeline()
{
/* Out of the main loop, clean up nicely */
LOG_INFO(logger,“pipeline[”<< pipeline_index << “]Returned, stopping playback,Deleting pipeline\n”);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}
';
} else {
strcpy (pad_name, "sink_%u");
}
mux_sink_pad = gst_element_get_request_pad (streammux, pad_name);
if (!mux_sink_pad) {
LOG_INFO(logger,"Failed to get sink pad from streammux\n");
goto done;
}
src_pad = gst_element_get_static_pad (elem, "src");
if (!src_pad) {
LOG_INFO(logger,"Failed to get src pad from '"<< GST_ELEMENT_NAME (elem)<<"\n");
goto done;
}
if (gst_pad_link (src_pad, mux_sink_pad) != GST_PAD_LINK_OK) {
LOG_INFO(logger,fmtString("Failed to link '{}' and '{}'\n", GST_ELEMENT_NAME (streammux),
GST_ELEMENT_NAME (elem)));
goto done;
}
ret = TRUE;
done:
if (mux_sink_pad) {
gst_object_unref (mux_sink_pad);
}
if (src_pad) {
gst_object_unref (src_pad);
}
return ret;
}
GstElement *VapdPipeline::create_source_bin_1 (guint index, gchar * uri)
{
GstElement *bin = nullptr,*decodebin= nullptr,*source = nullptr;
gchar bin_name[16] = { };
gchar source_name[16] = { };
g_snprintf (bin_name, 15, "source-bin-%02d", index);
g_snprintf (source_name, 15, "h264-source-%02d", index);
/* Create a source GstBin to abstract this bin's content from the rest of the
* pipeline */
bin = gst_bin_new (bin_name);
source = gst_element_factory_make ("appsrc", source_name);
//g_object_set (G_OBJECT (source), "max-bytes",0,"format",4,"is-live",true,nullptr);
decodebin = gst_element_factory_make("decodebin","decode_bin");
g_object_set (G_OBJECT (decodebin), "async-handling",true,nullptr);
source_vector.push_back(source);
int i = 0;
for(auto pipeline_channel : pipelineInfo.pipeline_channel)
{
if(i == index)
{
VapdChannel *channel = pipeline_channel.second;
channel->setAppSourceElement(source_vector[index]);
break;
}
i++;
}
if (!bin || !decodebin || !source_vector[index]) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]One element in source bin could not be created.\n");
return NULL;
}
g_signal_connect (G_OBJECT (decodebin), "pad-added",
G_CALLBACK (cb_newpad), bin);
gst_bin_add_many (GST_BIN (bin), source_vector[index],decodebin,NULL);
NVGSTDS_LINK_ELEMENT (source_vector[index], decodebin);
/* We need to create a ghost pad for the source bin which will act as a proxy
* for the video decoder src pad. The ghost pad will not have a target right
* now. Once the decode bin creates the video decoder and generates the
* cb_newpad callback, we will set the ghost pad target to the video decoder
* src pad. */
if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target ("src",
GST_PAD_SRC))) {
g_printerr ("Failed to add ghost pad in source bin\n");
return NULL;
}
return bin;
}
GstElement *VapdPipeline::create_sink_bin (int index)
{
GstElement *bin = nullptr;
gchar bin_name[16] = { };
gchar source_name[16] = { };
g_snprintf (bin_name, 15, "source-bin-%02d", index);
g_snprintf (source_name, 15, "h264-source-%02d", index);
/* Create a source GstBin to abstract this bin's content from the rest of the
* pipeline */
bin = gst_bin_new (bin_name);
//GstElement *queue = gst_element_factory_make ("queue", "queue");
GstElement *fakesink = gst_element_factory_make ("fakesink", "fakesink");
if (!bin || !fakesink) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]One element in source bin could not be created.\n");
return NULL;
}
gst_bin_add_many (GST_BIN (bin),fakesink,nullptr);
gst_element_link_many(fakesink,nullptr);
GstPad *pad = gst_element_get_static_pad (fakesink, "sink");
//GstPad *sinkpad = gst_element_get_static_pad (fakesink, "sink");
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
decode_pad_buffer_probe, this, NULL);
GstPad *bin_ghost_pad = gst_ghost_pad_new("sink", pad);
gst_pad_set_active (bin_ghost_pad, TRUE);
gst_element_add_pad (bin, bin_ghost_pad);
gst_object_unref (pad);
return bin;
}
GstElement *VapdPipeline::create_sink_queue (int index)
{
gchar fakesink_name[16] = { };
g_snprintf (fakesink_name, 15, "src_%u", index);
gchar queue_name[16] = { };
g_snprintf (queue_name, 15, "queue_%u", index);
GstElement *queue = gst_element_factory_make("queue",queue_name);
GstElement *fakesink = gst_element_factory_make("appsink",fakesink_name);
if (!fakesink || !queue) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streamdemux. Exiting.\n");
return nullptr;
}
//g_object_set (G_OBJECT (queue), "max-size-buffers",20000000000,nullptr);
//g_object_set (G_OBJECT (fakesink), "async",false,"sync",false, "num-buffers",4096000000,nullptr);
g_object_set (G_OBJECT (fakesink), "async",false,nullptr);
gst_bin_add_many (GST_BIN (pipeline), queue,fakesink, nullptr);
NVGSTDS_LINK_ELEMENT(queue,fakesink);
GstPad *fakesinkpad = gst_element_get_static_pad (fakesink, "sink");
if (!fakesinkpad) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]fakesinkpad get sink pad failed. Exiting.\n");
return nullptr;
}
else
{
gst_pad_add_probe (fakesinkpad, GST_PAD_PROBE_TYPE_BUFFER,
decode_pad_buffer_probe, this, NULL);
}
gst_object_unref (fakesinkpad);
return queue;
}
int VapdPipeline::link_streammux()
{
unsigned long size = pipelineInfo.pipeline_channel.size();
source_vector.reserve(size);
for (int i = 0; i < size; i++) {
//----------- for streammux ------------------
GstElement *source_bin = create_source_bin_1 (i, nullptr);
if (!source_bin) {
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to create source bin. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
if(!link_element_to_streammux_sink_pad(streammux,source_bin,i))
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streammux. Exiting.\n");
return -1;
}
//---------------------------------------------
}
}
int VapdPipeline::link_streamdemux()
{
unsigned long size = pipelineInfo.pipeline_channel.size();
for (int i = 0; i < size; i++) {
//----------- for streamdemux ------------------
GstElement *sink_queue = create_sink_queue (i);
//GstElement *sink_queue = create_sink_bin(i);
if(!sink_queue)
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to create sink queue. Exiting.\n");
return -1;
}
if(!link_element_to_demux_src_pad(streamdemux,sink_queue,i))
{
LOG_ERROR(logger,"VapdPipeline["<< pipeline_index << "]Failed to link streamdemux. Exiting.\n");
return -1;
}
}
}
int VapdPipeline::startPipeline()
{
/* Set the pipeline to "playing" state */
GstStateChangeReturn ret2 = gst_element_set_state (pipeline, GST_STATE_PLAYING);
/* Wait till pipeline encounters an error or EOS */
LOG_INFO(logger,"VapdPipeline["<< pipeline_index << "]is running...\n");
g_main_loop_run (loop);
LOG_INFO(logger,"pipeline["<< pipeline_index << "]Returned, stopping playback,Deleting pipeline\n");
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}
VapdPipeline::~VapdPipeline()
{
destoryPipeline();
delete thread_pipeline;
}
int VapdPipeline::destoryPipeline()
{
/* Out of the main loop, clean up nicely */
LOG_INFO(logger,"pipeline["<< pipeline_index << "]Returned, stopping playback,Deleting pipeline\n");
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}