[PATCH v3 6/7] ipa: rpi: sync: Add an implementation of the camera sync algorithm
Naushir Patuck
naush at raspberrypi.com
Fri Jan 17 10:26:17 CET 2025
Hi David,
On Thu, 9 Jan 2025 at 14:32, David Plowman
<david.plowman at raspberrypi.com> wrote:
>
> In this implementation, the server sends data packets out onto the
> network every 30 frames or so.
>
> Clients listening for this packet will send frame length deltas back
> to the pipeline handler to match the synchronisation of the server.
>
> We use wallclock timestamps, passed to us from the pipeline handler,
> that have been de-jittered appropriately, meaning that the
> synchronisation will actually work across networked devices.
>
> When the server's advertised "ready time" is reached, both client and
> server will signal this through metadata back to their respective
> controlling applications.
>
> Signed-off-by: David Plowman <david.plowman at raspberrypi.com>
> Signed-off-by: Arsen Mikovic <arsen.mikovic at raspberrypi.com>
> Signed-off-by: Naushir Patuck <naush at raspberrypi.com>
> ---
> src/ipa/rpi/controller/meson.build | 1 +
> src/ipa/rpi/controller/rpi/sync.cpp | 330 ++++++++++++++++++++++++++++
> src/ipa/rpi/controller/rpi/sync.h | 68 ++++++
> 3 files changed, 399 insertions(+)
> create mode 100644 src/ipa/rpi/controller/rpi/sync.cpp
> create mode 100644 src/ipa/rpi/controller/rpi/sync.h
>
> diff --git a/src/ipa/rpi/controller/meson.build b/src/ipa/rpi/controller/meson.build
> index 74b74888..dde4ac12 100644
> --- a/src/ipa/rpi/controller/meson.build
> +++ b/src/ipa/rpi/controller/meson.build
> @@ -23,6 +23,7 @@ rpi_ipa_controller_sources = files([
> 'rpi/saturation.cpp',
> 'rpi/sdn.cpp',
> 'rpi/sharpen.cpp',
> + 'rpi/sync.cpp',
> 'rpi/tonemap.cpp',
> ])
>
> diff --git a/src/ipa/rpi/controller/rpi/sync.cpp b/src/ipa/rpi/controller/rpi/sync.cpp
> new file mode 100644
> index 00000000..43a8cbe6
> --- /dev/null
> +++ b/src/ipa/rpi/controller/rpi/sync.cpp
> @@ -0,0 +1,330 @@
> +/* SPDX-License-Identifier: BSD-2-Clause */
> +/*
> + * Copyright (C) 2024, Raspberry Pi Ltd
> + *
> + * sync.cpp - sync algorithm
> + */
> +#include "sync.h"
> +
> +#include <chrono>
> +#include <ctype.h>
> +#include <fcntl.h>
> +#include <strings.h>
> +#include <unistd.h>
> +
> +#include <libcamera/base/log.h>
> +
> +#include <arpa/inet.h>
> +
> +#include "sync_status.h"
> +
> +using namespace std;
> +using namespace std::chrono_literals;
> +using namespace RPiController;
> +using namespace libcamera;
> +
> +LOG_DEFINE_CATEGORY(RPiSync)
> +
> +#define NAME "rpi.sync"
> +
> +const char *kDefaultGroup = "239.255.255.250";
> +constexpr unsigned int kDefaultPort = 10000;
> +constexpr unsigned int kDefaultSyncPeriod = 30;
> +constexpr unsigned int kDefaultReadyFrame = 100;
> +constexpr unsigned int kDefaultMinAdjustment = 50;
I wonder if we can embed these constants into the read() code below,
but I'm not fussed either way.
> +
> +Sync::Sync(Controller *controller)
> + : SyncAlgorithm(controller), mode_(Mode::Off), socket_(-1), frameDuration_(0s), frameCount_(0)
> +{
> +}
> +
> +Sync::~Sync()
> +{
> + if (socket_ >= 0)
> + close(socket_);
> +}
> +
> +char const *Sync::name() const
> +{
> + return NAME;
> +}
> +
> +/* This reads from json file and intitiaises server and client */
> +int Sync::read(const libcamera::YamlObject ¶ms)
> +{
> + /* Socket on which to communicate. */
> + group_ = params["group"].get<std::string>(kDefaultGroup);
> + port_ = params["port"].get<uint16_t>(kDefaultPort);
> + /* Send a sync message every this many frames. */
> + syncPeriod_ = params["sync_period"].get<uint32_t>(kDefaultSyncPeriod);
> + /* Application will be told we're ready after this many frames. */
> + readyFrame_ = params["ready_frame"].get<uint32_t>(kDefaultReadyFrame);
> + /* Don't change client frame length unless the change exceeds this amount (microseconds). */
> + minAdjustment_ = params["min_adjustment"].get<uint32_t>(kDefaultMinAdjustment);
> +
> + return 0;
> +}
> +
> +void Sync::initialiseSocket()
> +{
> + socket_ = socket(AF_INET, SOCK_DGRAM, 0);
> + if (socket_ < 0) {
> + LOG(RPiSync, Error) << "Unable to create socket";
> + return;
> + }
> +
> + memset(&addr_, 0, sizeof(addr_));
> + addr_.sin_family = AF_INET;
> + addr_.sin_addr.s_addr = mode_ == Mode::Client ? htonl(INADDR_ANY) : inet_addr(group_.c_str());
> + addr_.sin_port = htons(port_);
> +
> + if (mode_ == Mode::Client) {
> + /* Set to non-blocking. */
> + int flags = fcntl(socket_, F_GETFL, 0);
> + fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
> +
> + unsigned int en = 1;
> + if (setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &en, sizeof(en)) < 0) {
> + LOG(RPiSync, Error) << "Unable to set socket options";
> + goto err;
> + }
> +
> + struct ip_mreq mreq {
> + };
Extra newline added accidentaly?
> + mreq.imr_multiaddr.s_addr = inet_addr(group_.c_str());
> + mreq.imr_interface.s_addr = htonl(INADDR_ANY);
> + if (setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
> + LOG(RPiSync, Error) << "Unable to set socket options";
> + goto err;
> + }
> +
> + if (bind(socket_, (struct sockaddr *)&addr_, sizeof(addr_)) < 0) {
> + LOG(RPiSync, Error) << "Unable to bind client socket";
> + goto err;
> + }
> + }
> +
> + return;
> +
> +err:
> + close(socket_);
> + socket_ = -1;
> +}
> +
> +void Sync::switchMode([[maybe_unused]] CameraMode const &cameraMode, [[maybe_unused]] Metadata *metadata)
> +{
> + /*
> + * A mode switch means the camera has stopped, so synchronisation will be lost.
> + * Reset all the internal state so that we start over.
> + */
> + reset();
> +}
> +
> +/*
> + * Camera sync algorithm.
> + * Server - there is a single server that sends framerate timing information over the network to any
> + * clients that are listening. It also signals when it will send a "everything is synchronised, now go"
> + * message back to the algorithm.
> + * Client - there may be many clients, either on the same Pi or different ones. They match their
> + * framerates to the server, and indicate when to "go" at the same instant as the server.
> + */
> +void Sync::process([[maybe_unused]] StatisticsPtr &stats, Metadata *imageMetadata)
> +{
> + SyncPayload payload;
> + SyncParams local{};
> + SyncStatus status{};
> + bool timerKnown = true;
> +
> + if (mode_ == Mode::Off)
> + return;
> +
> + if (!frameDuration_) {
> + LOG(RPiSync, Error) << "Sync frame duration not set!";
> + return;
> + }
> +
> + if (socket_ < 0) {
> + initialiseSocket();
> +
> + if (socket_ < 0)
> + return;
Maybe we should add a warning log message here?
> +
> + /*
> + * For the client, flush anything in the socket. It might be stale from a previous sync run,
> + * or we might get another packet in a frame to two before the adjustment caused by this (old)
> + * packet, although correct, had taken effect. So this keeps things simpler.
> + */
> + if (mode_ == Mode::Client) {
> + socklen_t addrlen = sizeof(addr_);
> + int ret = 0;
> + while (ret >= 0)
> + ret = recvfrom(socket_, &payload, sizeof(payload), 0, (struct sockaddr *)&addr_, &addrlen);
> + }
> + }
> +
> + imageMetadata->get("sync.params", local);
> +
> + /* The wallclock has already been de-jittered for us. */
> + uint64_t wallClockFrameTimestamp = local.wallClock;
> +
> + /*
> + * This is the headline frame duration in microseconds as programmed into the sensor. Strictly,
> + * the sensor might not quite match the system clock, but this shouldn't matter for the calculations
> + * we'll do with it, unless it's a very very long way out!
> + */
> + uint32_t frameDuration = frameDuration_.get<std::micro>();
> +
> + /* Timestamps tell us if we've dropped any frames, but we still want to count them. */
> + int droppedFrames = 0;
> + if (frameCount_) {
> + /*
> + * Round down here, because frameCount_ gets incremented at the end of the function. Also
> + * ensure droppedFrames can't go negative. It shouldn't, but things would go badly wrong
> + * if it did.
> + */
> + wallClockFrameTimestamp = std::max<uint64_t>(wallClockFrameTimestamp, lastWallClockFrameTimestamp_ + frameDuration / 2);
> + droppedFrames = (wallClockFrameTimestamp - lastWallClockFrameTimestamp_ - frameDuration / 2) / frameDuration;
> + frameCount_ += droppedFrames;
> + }
> +
> + if (mode_ == Mode::Server) {
> + /*
> + * Server sends a packet every syncPeriod_ frames, or as soon after as possible (if any
> + * frames were dropped).
> + */
> + serverFrameCountPeriod_ += droppedFrames;
> +
> + /*
> + * The client may want a better idea of the true frame duration. Any error would feed straight
> + * into the correction term because of how it uses it to get the "nearest" frame.
> + */
> + if (frameCount_ == 0)
> + frameDurationEstimated_ = frameDuration;
> + else {
> + double diff = (wallClockFrameTimestamp - lastWallClockFrameTimestamp_) / (1 + droppedFrames);
> + int N = std::min(frameCount_, 99U);
> + frameDurationEstimated_ = frameCount_ == 1 ? diff : (N * frameDurationEstimated_ + diff) / (N + 1);
> + }
> +
> + /* Calculate frames remaining, and therefore "time left until ready". */
> + int framesRemaining = readyFrame_ - frameCount_;
> + uint64_t wallClockReadyTime = wallClockFrameTimestamp + (int64_t)framesRemaining * frameDurationEstimated_;
> +
> + if (serverFrameCountPeriod_ >= syncPeriod_) {
> + serverFrameCountPeriod_ = 0;
> +
> + payload.frameDuration = frameDurationEstimated_ + .5; /* round to nearest */
> + payload.wallClockFrameTimestamp = wallClockFrameTimestamp;
> + payload.wallClockReadyTime = wallClockReadyTime;
> +
> + LOG(RPiSync, Debug) << "Send packet (frameNumber " << frameCount_ << "):";
> + LOG(RPiSync, Debug) << " frameDuration " << payload.frameDuration;
> + LOG(RPiSync, Debug) << " wallClockFrameTimestamp " << wallClockFrameTimestamp
> + << " (" << wallClockFrameTimestamp - lastWallClockFrameTimestamp_ << ")";
> + LOG(RPiSync, Debug) << " wallClockReadyTime " << wallClockReadyTime;
> +
> + if (sendto(socket_, &payload, sizeof(payload), 0, (const sockaddr *)&addr_, sizeof(addr_)) < 0)
> + LOG(RPiSync, Error) << "Send error! " << strerror(errno);
> + }
> +
> + timerValue_ = static_cast<int64_t>(wallClockReadyTime - wallClockFrameTimestamp);
> + if (!syncReady_ && wallClockFrameTimestamp + frameDurationEstimated_ / 2 > wallClockReadyTime) {
> + syncReady_ = true;
> + LOG(RPiSync, Info) << "*** Sync achieved! Difference " << timerValue_ << "us";
Could we remove the *** from the message?
> + }
> +
> + serverFrameCountPeriod_ += 1;
> +
> + } else if (mode_ == Mode::Client) {
> + uint64_t serverFrameTimestamp = 0;
> +
> + bool packetReceived = false;
> + while (true) {
> + socklen_t addrlen = sizeof(addr_);
> + int ret = recvfrom(socket_, &payload, sizeof(payload), 0, (struct sockaddr *)&addr_, &addrlen);
> +
> + if (ret < 0)
> + break;
> + packetReceived = (ret > 0);
> + clientSeenPacket_ = true;
> +
> + frameDurationEstimated_ = payload.frameDuration;
> + serverFrameTimestamp = payload.wallClockFrameTimestamp;
> + serverReadyTime_ = payload.wallClockReadyTime;
> + }
> +
> + if (packetReceived) {
> + uint64_t clientFrameTimestamp = wallClockFrameTimestamp;
> + int64_t clientServerDelta = clientFrameTimestamp - serverFrameTimestamp;
> + /* "A few frames ago" may have better matched the server's frame. Calculate when it was. */
> + int framePeriodErrors = (clientServerDelta + frameDurationEstimated_ / 2) / frameDurationEstimated_;
> + int64_t clientFrameTimestampNearest = clientFrameTimestamp - framePeriodErrors * frameDurationEstimated_;
> + /* We must shorten a single client frame by this amount if it exceeds the minimum: */
> + int32_t correction = clientFrameTimestampNearest - serverFrameTimestamp;
> + if (std::abs(correction) < minAdjustment_)
> + correction = 0;
> +
> + LOG(RPiSync, Debug) << "Received packet (frameNumber " << frameCount_ << "):";
> + LOG(RPiSync, Debug) << " serverFrameTimestamp " << serverFrameTimestamp;
> + LOG(RPiSync, Debug) << " serverReadyTime " << serverReadyTime_;
> + LOG(RPiSync, Debug) << " clientFrameTimestamp " << clientFrameTimestamp;
> + LOG(RPiSync, Debug) << " clientFrameTimestampNearest " << clientFrameTimestampNearest
> + << " (" << framePeriodErrors << ")";
> + LOG(RPiSync, Debug) << " correction " << correction;
> +
> + status.frameDurationOffset = correction * 1us;
> + }
> +
> + timerValue_ = static_cast<int64_t>(serverReadyTime_ - wallClockFrameTimestamp);
> + timerKnown = clientSeenPacket_; /* client must receive a packet before the timer value is correct */
> + if (clientSeenPacket_ && !syncReady_ && wallClockFrameTimestamp + frameDurationEstimated_ / 2 > serverReadyTime_) {
> + syncReady_ = true;
> + LOG(RPiSync, Info) << "*** Sync achieved! Difference " << timerValue_ << "us";
> + }
> + }
> +
> + lastWallClockFrameTimestamp_ = wallClockFrameTimestamp;
> +
> + status.ready = syncReady_;
> + status.timerValue = timerValue_;
> + status.timerKnown = timerKnown;
> + imageMetadata->set("sync.status", status);
> + frameCount_++;
> +}
> +
> +void Sync::reset()
> +{
> + /* This resets the state so that the synchronisation procedure will start over. */
> + syncReady_ = false;
> + frameCount_ = 0;
> + timerValue_ = 0;
> + serverFrameCountPeriod_ = 0;
> + serverReadyTime_ = 0;
> + clientSeenPacket_ = false;
> +}
> +
> +void Sync::setMode(Mode mode)
> +{
> + mode_ = mode;
> +
> + /* Another "sync session" can be started by turning it off and on again. */
> + if (mode == Mode::Off)
> + reset();
Should we have a top-level Sync::Reset() API call for this?
Minors aside:
Reviewed-by: Naushir Patuck <naush at raspberrypi.com>
> +}
> +
> +void Sync::setFrameDuration(libcamera::utils::Duration frameDuration)
> +{
> + frameDuration_ = frameDuration;
> +};
> +
> +void Sync::setReadyFrame(unsigned int frame)
> +{
> + readyFrame_ = frame;
> +};
> +
> +/* Register algorithm with the system. */
> +static Algorithm *create(Controller *controller)
> +{
> + return (Algorithm *)new Sync(controller);
> +}
> +static RegisterAlgorithm reg(NAME, &create);
> diff --git a/src/ipa/rpi/controller/rpi/sync.h b/src/ipa/rpi/controller/rpi/sync.h
> new file mode 100644
> index 00000000..d3c79b7a
> --- /dev/null
> +++ b/src/ipa/rpi/controller/rpi/sync.h
> @@ -0,0 +1,68 @@
> +/* SPDX-License-Identifier: BSD-2-Clause */
> +/*
> + * Copyright (C) 2024, Raspberry Pi Ltd
> + *
> + * sync.h - sync algorithm
> + */
> +#pragma once
> +
> +#include <netinet/ip.h>
> +
> +#include "../sync_algorithm.h"
> +
> +namespace RPiController {
> +
> +struct SyncPayload {
> + /* Frame duration in microseconds. */
> + uint32_t frameDuration;
> + /* Server system (kernel) frame timestamp. */
> + uint64_t systemFrameTimestamp;
> + /* Server wall clock version of the frame timestamp. */
> + uint64_t wallClockFrameTimestamp;
> + /* Server system (kernel) sync time (the time at which frames are marked ready). */
> + uint64_t systemReadyTime;
> + /* Server wall clock version of the sync time. */
> + uint64_t wallClockReadyTime;
> +};
> +
> +class Sync : public SyncAlgorithm
> +{
> +public:
> + Sync(Controller *controller);
> + ~Sync();
> + char const *name() const override;
> + int read(const libcamera::YamlObject ¶ms) override;
> + void setMode(Mode mode) override;
> + void initialiseSocket();
> + void switchMode(CameraMode const &cameraMode, Metadata *metadata) override;
> + void process(StatisticsPtr &stats, Metadata *imageMetadata) override;
> + void setFrameDuration(libcamera::utils::Duration frameDuration) override;
> + void setReadyFrame(unsigned int frame) override;
> +
> +private:
> + void reset(); /* reset internal state and start over */
> +
> + Mode mode_; /* server or client */
> + std::string group_; /* IP group address for sync messages */
> + uint16_t port_; /* port number for messages */
> + uint32_t syncPeriod_; /* send a sync message every this many frames */
> + uint32_t readyFrame_; /* tell the application we're ready after this many frames */
> + uint32_t minAdjustment_; /* don't adjust the client frame length by less than this */
> +
> + struct sockaddr_in addr_;
> + int socket_ = -1;
> + libcamera::utils::Duration frameDuration_;
> + unsigned int frameCount_;
> + bool syncReady_;
> + int64_t timerValue_ = 0; /* time until "ready time" */
> +
> + double frameDurationEstimated_ = 0; /* estimate the true frame duration of the sensor */
> + uint64_t lastWallClockFrameTimestamp_; /* wall clock timestamp of previous frame */
> +
> + uint32_t serverFrameCountPeriod_ = 0; /* send the next packet when this reaches syncPeriod_ */
> +
> + bool clientSeenPacket_ = false; /* whether the client has received a packet yet */
> + uint64_t serverReadyTime_ = 0; /* the client's latest value for when the server will be "ready" */
> +};
> +
> +} /* namespace RPiController */
> --
> 2.39.5
>
More information about the libcamera-devel
mailing list