[libcamera-devel] [PATCH v3 22/27] gst: libcamerasrc: Implement initial streaming
Laurent Pinchart
laurent.pinchart at ideasonboard.com
Fri Mar 6 21:59:12 CET 2020
Hi Nicolas,
Thank you for the patch.
On Fri, Mar 06, 2020 at 03:26:32PM -0500, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
>
> With this patch, the element is now able to push buffers to the next
> element in the graph. The buffers are currently missing any metadata
> like timestamp, sequence number. This will be added in the next commit.
>
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>
Reviewed-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> ---
> src/gstreamer/gstlibcamerapad.cpp | 9 ++
> src/gstreamer/gstlibcamerapad.h | 2 +
> src/gstreamer/gstlibcamerasrc.cpp | 192 +++++++++++++++++++++++++++++-
> 3 files changed, 202 insertions(+), 1 deletion(-)
>
> diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
> index 49dd35b..840f391 100644
> --- a/src/gstreamer/gstlibcamerapad.cpp
> +++ b/src/gstreamer/gstlibcamerapad.cpp
> @@ -19,6 +19,7 @@ struct _GstLibcameraPad {
> StreamRole role;
> GstLibcameraPool *pool;
> GQueue pending_buffers;
> + GstClockTime latency;
> };
>
> enum {
> @@ -164,3 +165,11 @@ gst_libcamera_pad_push_pending(GstPad *pad)
>
> return gst_pad_push(pad, buffer);
> }
> +
> +bool
> +gst_libcamera_pad_has_pending(GstPad *pad)
> +{
> + auto *self = GST_LIBCAMERA_PAD(pad);
> + GLibLocker lock(GST_OBJECT(self));
> + return self->pending_buffers.length > 0;
> +}
> diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
> index 2e9ec20..9d43129 100644
> --- a/src/gstreamer/gstlibcamerapad.h
> +++ b/src/gstreamer/gstlibcamerapad.h
> @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
>
> GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
>
> +bool gst_libcamera_pad_has_pending(GstPad *pad);
> +
> #endif /* __GST_LIBCAMERA_PAD_H__ */
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 5ffc004..e3718db 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -14,8 +14,11 @@
>
> #include "gstlibcamerasrc.h"
>
> +#include <queue>
> #include <vector>
>
> +#include <gst/base/base.h>
> +
> #include <libcamera/camera.h>
> #include <libcamera/camera_manager.h>
>
> @@ -29,12 +32,71 @@ using namespace libcamera;
> GST_DEBUG_CATEGORY_STATIC(source_debug);
> #define GST_CAT_DEFAULT source_debug
>
> +struct RequestWrap {
> + RequestWrap(Request *request);
> + ~RequestWrap();
> +
> + void attachBuffer(GstBuffer *buffer);
> + GstBuffer *detachBuffer(Stream *stream);
> +
> + /* For ptr comparison only. */
> + Request *request_;
> + std::map<Stream *, GstBuffer *> buffers_;
> +};
> +
> +RequestWrap::RequestWrap(Request *request)
> + : request_(request)
> +{
> +}
> +
> +RequestWrap::~RequestWrap()
> +{
> + for (std::pair<Stream *const, GstBuffer *> &item : buffers_) {
> + if (item.second)
> + gst_buffer_unref(item.second);
> + }
> +}
> +
> +void RequestWrap::attachBuffer(GstBuffer *buffer)
> +{
> + FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> + Stream *stream = gst_libcamera_buffer_get_stream(buffer);
> +
> + request_->addBuffer(stream, fb);
> +
> + auto item = buffers_.find(stream);
> + if (item != buffers_.end()) {
> + gst_buffer_unref(item->second);
> + item->second = buffer;
> + } else {
> + buffers_[stream] = buffer;
> + }
> +}
> +
> +GstBuffer *RequestWrap::detachBuffer(Stream *stream)
> +{
> + GstBuffer *buffer = nullptr;
> +
> + auto item = buffers_.find(stream);
> + if (item != buffers_.end()) {
> + buffer = item->second;
> + item->second = nullptr;
> + }
> +
> + return buffer;
> +}
> +
> /* Used for C++ object with destructors. */
> struct GstLibcameraSrcState {
> + GstLibcameraSrc *src_;
> +
> std::unique_ptr<CameraManager> cm_;
> std::shared_ptr<Camera> cam_;
> std::unique_ptr<CameraConfiguration> config_;
> std::vector<GstPad *> srcpads_;
> + std::queue<std::unique_ptr<RequestWrap>> requests_;
> +
> + void requestCompleted(Request *request);
> };
>
> struct _GstLibcameraSrc {
> @@ -47,6 +109,7 @@ struct _GstLibcameraSrc {
>
> GstLibcameraSrcState *state;
> GstLibcameraAllocator *allocator;
> + GstFlowCombiner *flow_combiner;
> };
>
> enum {
> @@ -70,6 +133,41 @@ GstStaticPadTemplate request_src_template = {
> "src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
> };
>
> +void
> +GstLibcameraSrcState::requestCompleted(Request *request)
> +{
> + GLibLocker lock(GST_OBJECT(src_));
> +
> + GST_DEBUG_OBJECT(src_, "buffers are ready");
> +
> + std::unique_ptr<RequestWrap> wrap = std::move(requests_.front());
> + requests_.pop();
> +
> + g_return_if_fail(wrap->request_ == request);
> +
> + if ((request->status() == Request::RequestCancelled)) {
> + GST_DEBUG_OBJECT(src_, "Request was cancelled");
> + return;
> + }
> +
> + GstBuffer *buffer;
> + for (GstPad *srcpad : srcpads_) {
> + Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> + buffer = wrap->detachBuffer(stream);
> + gst_libcamera_pad_queue_buffer(srcpad, buffer);
> + }
> +
> + {
> + /* We only want to resume the task if it's paused. */
> + GstTask *task = src_->task;
> + GLibLocker lock(GST_OBJECT(task));
> + if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
> + GST_TASK_STATE(task) = GST_TASK_STARTED;
> + GST_TASK_SIGNAL(task);
> + }
> + }
> +}
> +
> static bool
> gst_libcamera_src_open(GstLibcameraSrc *self)
> {
> @@ -122,6 +220,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
> return false;
> }
>
> + cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
> +
> /* No need to lock here, we didn't start our threads yet. */
> self->state->cm_ = std::move(cm);
> self->state->cam_ = cam;
> @@ -133,8 +233,77 @@ static void
> gst_libcamera_src_task_run(gpointer user_data)
> {
> GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> + GstLibcameraSrcState *state = self->state;
> +
> + Request *request = state->cam_->createRequest();
> + auto wrap = std::make_unique<RequestWrap>(request);
> + for (GstPad *srcpad : state->srcpads_) {
> + GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
> + GstBuffer *buffer;
> + GstFlowReturn ret;
> +
> + ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
> + &buffer, nullptr);
> + if (ret != GST_FLOW_OK) {
> + /*
> + * RequestWrap does not take ownership, and we won't be
> + * queueing this one due to lack of buffers.
> + */
> + delete request;
> + request = nullptr;
> + break;
> + }
> +
> + wrap->attachBuffer(buffer);
> + }
> +
> + if (request) {
> + GLibLocker lock(GST_OBJECT(self));
> + GST_TRACE_OBJECT(self, "Requesting buffers");
> + state->cam_->queueRequest(request);
> + state->requests_.push(std::move(wrap));
> + }
> +
> + GstFlowReturn ret = GST_FLOW_OK;
> + gst_flow_combiner_reset(self->flow_combiner);
> + for (GstPad *srcpad : state->srcpads_) {
> + ret = gst_libcamera_pad_push_pending(srcpad);
> + ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> + srcpad, ret);
> + }
>
> - GST_DEBUG_OBJECT(self, "Streaming thread is now capturing");
> + {
> + /*
> + * Here we need to decide if we want to pause or stop the task. This
> + * needs to happen in lock step with the callback thread which may want
> + * to resume the task.
> + */
> + GLibLocker lock(GST_OBJECT(self));
> + if (ret != GST_FLOW_OK) {
> + if (ret == GST_FLOW_EOS) {
> + g_autoptr(GstEvent) eos = gst_event_new_eos();
> + guint32 seqnum = gst_util_seqnum_next();
> + gst_event_set_seqnum(eos, seqnum);
> + for (GstPad *srcpad : state->srcpads_)
> + gst_pad_push_event(srcpad, gst_event_ref(eos));
> + } else if (ret != GST_FLOW_FLUSHING) {
> + GST_ELEMENT_FLOW_ERROR(self, ret);
> + }
> + gst_task_stop(self->task);
> + return;
> + }
> +
> + bool do_pause = true;
> + for (GstPad *srcpad : state->srcpads_) {
> + if (gst_libcamera_pad_has_pending(srcpad)) {
> + do_pause = false;
> + break;
> + }
> + }
> +
> + if (do_pause)
> + gst_task_pause(self->task);
> + }
> }
>
> static void
> @@ -233,12 +402,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
> return;
> }
>
> + self->flow_combiner = gst_flow_combiner_new();
> for (gsize i = 0; i < state->srcpads_.size(); i++) {
> GstPad *srcpad = state->srcpads_[i];
> const StreamConfiguration &stream_cfg = state->config_->at(i);
> GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
> stream_cfg.stream());
> gst_libcamera_pad_set_pool(srcpad, pool);
> + gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
> + }
> +
> + ret = state->cam_->start();
> + if (ret) {
> + GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> + ("Failed to start the camera: %s", g_strerror(-ret)),
> + ("Camera.start() failed with error code %i", ret));
> + gst_task_stop(task);
> + return;
> }
>
> done:
> @@ -260,10 +440,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
>
> GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
>
> + state->cam_->stop();
> +
> for (GstPad *srcpad : state->srcpads_)
> gst_libcamera_pad_set_pool(srcpad, NULL);
>
> g_clear_object(&self->allocator);
> + g_clear_pointer(&self->flow_combiner,
> + (GDestroyNotify)gst_flow_combiner_free);
> }
>
> static void
> @@ -343,6 +527,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
> return GST_STATE_CHANGE_FAILURE;
> ret = GST_STATE_CHANGE_NO_PREROLL;
> break;
> + case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
> + gst_task_start(self->task);
> + break;
> case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
> ret = GST_STATE_CHANGE_NO_PREROLL;
> break;
> @@ -394,6 +581,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
>
> state->srcpads_.push_back(gst_pad_new_from_template(templ, "src"));
> gst_element_add_pad(GST_ELEMENT(self), state->srcpads_[0]);
> +
> + /* C-style friend. */
> + state->src_ = self;
> self->state = state;
> }
>
--
Regards,
Laurent Pinchart
More information about the libcamera-devel
mailing list