[PATCH v3 6/7] ipa: rpi: sync: Add an implementation of the camera sync algorithm

Naushir Patuck naush at raspberrypi.com
Fri Jan 17 10:26:17 CET 2025


Hi David,

On Thu, 9 Jan 2025 at 14:32, David Plowman
<david.plowman at raspberrypi.com> wrote:
>
> In this implementation, the server sends data packets out onto the
> network every 30 frames or so.
>
> Clients listening for this packet will send frame length deltas back
> to the pipeline handler to match the synchronisation of the server.
>
> We use wallclock timestamps, passed to us from the pipeline handler,
> that have been de-jittered appropriately, meaning that the
> synchronisation will actually work across networked devices.
>
> When the server's advertised "ready time" is reached, both client and
> server will signal this through metadata back to their respective
> controlling applications.
>
> Signed-off-by: David Plowman <david.plowman at raspberrypi.com>
> Signed-off-by: Arsen Mikovic <arsen.mikovic at raspberrypi.com>
> Signed-off-by: Naushir Patuck <naush at raspberrypi.com>
> ---
>  src/ipa/rpi/controller/meson.build  |   1 +
>  src/ipa/rpi/controller/rpi/sync.cpp | 330 ++++++++++++++++++++++++++++
>  src/ipa/rpi/controller/rpi/sync.h   |  68 ++++++
>  3 files changed, 399 insertions(+)
>  create mode 100644 src/ipa/rpi/controller/rpi/sync.cpp
>  create mode 100644 src/ipa/rpi/controller/rpi/sync.h
>
> diff --git a/src/ipa/rpi/controller/meson.build b/src/ipa/rpi/controller/meson.build
> index 74b74888..dde4ac12 100644
> --- a/src/ipa/rpi/controller/meson.build
> +++ b/src/ipa/rpi/controller/meson.build
> @@ -23,6 +23,7 @@ rpi_ipa_controller_sources = files([
>      'rpi/saturation.cpp',
>      'rpi/sdn.cpp',
>      'rpi/sharpen.cpp',
> +    'rpi/sync.cpp',
>      'rpi/tonemap.cpp',
>  ])
>
> diff --git a/src/ipa/rpi/controller/rpi/sync.cpp b/src/ipa/rpi/controller/rpi/sync.cpp
> new file mode 100644
> index 00000000..43a8cbe6
> --- /dev/null
> +++ b/src/ipa/rpi/controller/rpi/sync.cpp
> @@ -0,0 +1,330 @@
> +/* SPDX-License-Identifier: BSD-2-Clause */
> +/*
> + * Copyright (C) 2024, Raspberry Pi Ltd
> + *
> + * sync.cpp - sync algorithm
> + */
> +#include "sync.h"
> +
> +#include <chrono>
> +#include <ctype.h>
> +#include <fcntl.h>
> +#include <strings.h>
> +#include <unistd.h>
> +
> +#include <libcamera/base/log.h>
> +
> +#include <arpa/inet.h>
> +
> +#include "sync_status.h"
> +
> +using namespace std;
> +using namespace std::chrono_literals;
> +using namespace RPiController;
> +using namespace libcamera;
> +
> +LOG_DEFINE_CATEGORY(RPiSync)
> +
> +#define NAME "rpi.sync"
> +
> +const char *kDefaultGroup = "239.255.255.250";
> +constexpr unsigned int kDefaultPort = 10000;
> +constexpr unsigned int kDefaultSyncPeriod = 30;
> +constexpr unsigned int kDefaultReadyFrame = 100;
> +constexpr unsigned int kDefaultMinAdjustment = 50;

I wonder if we can embed these constants into the read() code below,
but I'm not fussed either way.

> +
> +Sync::Sync(Controller *controller)
> +       : SyncAlgorithm(controller), mode_(Mode::Off), socket_(-1), frameDuration_(0s), frameCount_(0)
> +{
> +}
> +
> +Sync::~Sync()
> +{
> +       if (socket_ >= 0)
> +               close(socket_);
> +}
> +
> +char const *Sync::name() const
> +{
> +       return NAME;
> +}
> +
> +/* This reads from json file and intitiaises server and client */
> +int Sync::read(const libcamera::YamlObject &params)
> +{
> +       /* Socket on which to communicate. */
> +       group_ = params["group"].get<std::string>(kDefaultGroup);
> +       port_ = params["port"].get<uint16_t>(kDefaultPort);
> +       /* Send a sync message every this many frames. */
> +       syncPeriod_ = params["sync_period"].get<uint32_t>(kDefaultSyncPeriod);
> +       /* Application will be told we're ready after this many frames. */
> +       readyFrame_ = params["ready_frame"].get<uint32_t>(kDefaultReadyFrame);
> +       /* Don't change client frame length unless the change exceeds this amount (microseconds). */
> +       minAdjustment_ = params["min_adjustment"].get<uint32_t>(kDefaultMinAdjustment);
> +
> +       return 0;
> +}
> +
> +void Sync::initialiseSocket()
> +{
> +       socket_ = socket(AF_INET, SOCK_DGRAM, 0);
> +       if (socket_ < 0) {
> +               LOG(RPiSync, Error) << "Unable to create socket";
> +               return;
> +       }
> +
> +       memset(&addr_, 0, sizeof(addr_));
> +       addr_.sin_family = AF_INET;
> +       addr_.sin_addr.s_addr = mode_ == Mode::Client ? htonl(INADDR_ANY) : inet_addr(group_.c_str());
> +       addr_.sin_port = htons(port_);
> +
> +       if (mode_ == Mode::Client) {
> +               /* Set to non-blocking. */
> +               int flags = fcntl(socket_, F_GETFL, 0);
> +               fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
> +
> +               unsigned int en = 1;
> +               if (setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &en, sizeof(en)) < 0) {
> +                       LOG(RPiSync, Error) << "Unable to set socket options";
> +                       goto err;
> +               }
> +
> +               struct ip_mreq mreq {
> +               };

Extra newline added accidentaly?

> +               mreq.imr_multiaddr.s_addr = inet_addr(group_.c_str());
> +               mreq.imr_interface.s_addr = htonl(INADDR_ANY);
> +               if (setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
> +                       LOG(RPiSync, Error) << "Unable to set socket options";
> +                       goto err;
> +               }
> +
> +               if (bind(socket_, (struct sockaddr *)&addr_, sizeof(addr_)) < 0) {
> +                       LOG(RPiSync, Error) << "Unable to bind client socket";
> +                       goto err;
> +               }
> +       }
> +
> +       return;
> +
> +err:
> +       close(socket_);
> +       socket_ = -1;
> +}
> +
> +void Sync::switchMode([[maybe_unused]] CameraMode const &cameraMode, [[maybe_unused]] Metadata *metadata)
> +{
> +       /*
> +        * A mode switch means the camera has stopped, so synchronisation will be lost.
> +        * Reset all the internal state so that we start over.
> +        */
> +       reset();
> +}
> +
> +/*
> + * Camera sync algorithm.
> + *     Server - there is a single server that sends framerate timing information over the network to any
> + *         clients that are listening. It also signals when it will send a "everything is synchronised, now go"
> + *         message back to the algorithm.
> + *     Client - there may be many clients, either on the same Pi or different ones. They match their
> + *         framerates to the server, and indicate when to "go" at the same instant as the server.
> + */
> +void Sync::process([[maybe_unused]] StatisticsPtr &stats, Metadata *imageMetadata)
> +{
> +       SyncPayload payload;
> +       SyncParams local{};
> +       SyncStatus status{};
> +       bool timerKnown = true;
> +
> +       if (mode_ == Mode::Off)
> +               return;
> +
> +       if (!frameDuration_) {
> +               LOG(RPiSync, Error) << "Sync frame duration not set!";
> +               return;
> +       }
> +
> +       if (socket_ < 0) {
> +               initialiseSocket();
> +
> +               if (socket_ < 0)
> +                       return;

Maybe we should add a warning log message here?

> +
> +               /*
> +                * For the client, flush anything in the socket. It might be stale from a previous sync run,
> +                * or we might get another packet in a frame to two before the adjustment caused by this (old)
> +                * packet, although correct, had taken effect. So this keeps things simpler.
> +                */
> +               if (mode_ == Mode::Client) {
> +                       socklen_t addrlen = sizeof(addr_);
> +                       int ret = 0;
> +                       while (ret >= 0)
> +                               ret = recvfrom(socket_, &payload, sizeof(payload), 0, (struct sockaddr *)&addr_, &addrlen);
> +               }
> +       }
> +
> +       imageMetadata->get("sync.params", local);
> +
> +       /* The wallclock has already been de-jittered for us. */
> +       uint64_t wallClockFrameTimestamp = local.wallClock;
> +
> +       /*
> +        * This is the headline frame duration in microseconds as programmed into the sensor. Strictly,
> +        * the sensor might not quite match the system clock, but this shouldn't matter for the calculations
> +        * we'll do with it, unless it's a very very long way out!
> +        */
> +       uint32_t frameDuration = frameDuration_.get<std::micro>();
> +
> +       /* Timestamps tell us if we've dropped any frames, but we still want to count them. */
> +       int droppedFrames = 0;
> +       if (frameCount_) {
> +               /*
> +                * Round down here, because frameCount_ gets incremented at the end of the function. Also
> +                * ensure droppedFrames can't go negative. It shouldn't, but things would go badly wrong
> +                * if it did.
> +                */
> +               wallClockFrameTimestamp = std::max<uint64_t>(wallClockFrameTimestamp, lastWallClockFrameTimestamp_ + frameDuration / 2);
> +               droppedFrames = (wallClockFrameTimestamp - lastWallClockFrameTimestamp_ - frameDuration / 2) / frameDuration;
> +               frameCount_ += droppedFrames;
> +       }
> +
> +       if (mode_ == Mode::Server) {
> +               /*
> +                * Server sends a packet every syncPeriod_ frames, or as soon after as possible (if any
> +                * frames were dropped).
> +                */
> +               serverFrameCountPeriod_ += droppedFrames;
> +
> +               /*
> +                * The client may want a better idea of the true frame duration. Any error would feed straight
> +                * into the correction term because of how it uses it to get the "nearest" frame.
> +                */
> +               if (frameCount_ == 0)
> +                       frameDurationEstimated_ = frameDuration;
> +               else {
> +                       double diff = (wallClockFrameTimestamp - lastWallClockFrameTimestamp_) / (1 + droppedFrames);
> +                       int N = std::min(frameCount_, 99U);
> +                       frameDurationEstimated_ = frameCount_ == 1 ? diff : (N * frameDurationEstimated_ + diff) / (N + 1);
> +               }
> +
> +               /* Calculate frames remaining, and therefore "time left until ready". */
> +               int framesRemaining = readyFrame_ - frameCount_;
> +               uint64_t wallClockReadyTime = wallClockFrameTimestamp + (int64_t)framesRemaining * frameDurationEstimated_;
> +
> +               if (serverFrameCountPeriod_ >= syncPeriod_) {
> +                       serverFrameCountPeriod_ = 0;
> +
> +                       payload.frameDuration = frameDurationEstimated_ + .5; /* round to nearest */
> +                       payload.wallClockFrameTimestamp = wallClockFrameTimestamp;
> +                       payload.wallClockReadyTime = wallClockReadyTime;
> +
> +                       LOG(RPiSync, Debug) << "Send packet (frameNumber " << frameCount_ << "):";
> +                       LOG(RPiSync, Debug) << "            frameDuration " << payload.frameDuration;
> +                       LOG(RPiSync, Debug) << "            wallClockFrameTimestamp " << wallClockFrameTimestamp
> +                                           << " (" << wallClockFrameTimestamp - lastWallClockFrameTimestamp_ << ")";
> +                       LOG(RPiSync, Debug) << "            wallClockReadyTime " << wallClockReadyTime;
> +
> +                       if (sendto(socket_, &payload, sizeof(payload), 0, (const sockaddr *)&addr_, sizeof(addr_)) < 0)
> +                               LOG(RPiSync, Error) << "Send error! " << strerror(errno);
> +               }
> +
> +               timerValue_ = static_cast<int64_t>(wallClockReadyTime - wallClockFrameTimestamp);
> +               if (!syncReady_ && wallClockFrameTimestamp + frameDurationEstimated_ / 2 > wallClockReadyTime) {
> +                       syncReady_ = true;
> +                       LOG(RPiSync, Info) << "*** Sync achieved! Difference " << timerValue_ << "us";

Could we remove the *** from the message?

> +               }
> +
> +               serverFrameCountPeriod_ += 1;
> +
> +       } else if (mode_ == Mode::Client) {
> +               uint64_t serverFrameTimestamp = 0;
> +
> +               bool packetReceived = false;
> +               while (true) {
> +                       socklen_t addrlen = sizeof(addr_);
> +                       int ret = recvfrom(socket_, &payload, sizeof(payload), 0, (struct sockaddr *)&addr_, &addrlen);
> +
> +                       if (ret < 0)
> +                               break;
> +                       packetReceived = (ret > 0);
> +                       clientSeenPacket_ = true;
> +
> +                       frameDurationEstimated_ = payload.frameDuration;
> +                       serverFrameTimestamp = payload.wallClockFrameTimestamp;
> +                       serverReadyTime_ = payload.wallClockReadyTime;
> +               }
> +
> +               if (packetReceived) {
> +                       uint64_t clientFrameTimestamp = wallClockFrameTimestamp;
> +                       int64_t clientServerDelta = clientFrameTimestamp - serverFrameTimestamp;
> +                       /* "A few frames ago" may have better matched the server's frame. Calculate when it was. */
> +                       int framePeriodErrors = (clientServerDelta + frameDurationEstimated_ / 2) / frameDurationEstimated_;
> +                       int64_t clientFrameTimestampNearest = clientFrameTimestamp - framePeriodErrors * frameDurationEstimated_;
> +                       /* We must shorten a single client frame by this amount if it exceeds the minimum: */
> +                       int32_t correction = clientFrameTimestampNearest - serverFrameTimestamp;
> +                       if (std::abs(correction) < minAdjustment_)
> +                               correction = 0;
> +
> +                       LOG(RPiSync, Debug) << "Received packet (frameNumber " << frameCount_ << "):";
> +                       LOG(RPiSync, Debug) << "                serverFrameTimestamp " << serverFrameTimestamp;
> +                       LOG(RPiSync, Debug) << "                serverReadyTime " << serverReadyTime_;
> +                       LOG(RPiSync, Debug) << "                clientFrameTimestamp " << clientFrameTimestamp;
> +                       LOG(RPiSync, Debug) << "                clientFrameTimestampNearest " << clientFrameTimestampNearest
> +                                           << " (" << framePeriodErrors << ")";
> +                       LOG(RPiSync, Debug) << "                correction " << correction;
> +
> +                       status.frameDurationOffset = correction * 1us;
> +               }
> +
> +               timerValue_ = static_cast<int64_t>(serverReadyTime_ - wallClockFrameTimestamp);
> +               timerKnown = clientSeenPacket_; /* client must receive a packet before the timer value is correct */
> +               if (clientSeenPacket_ && !syncReady_ && wallClockFrameTimestamp + frameDurationEstimated_ / 2 > serverReadyTime_) {
> +                       syncReady_ = true;
> +                       LOG(RPiSync, Info) << "*** Sync achieved! Difference " << timerValue_ << "us";
> +               }
> +       }
> +
> +       lastWallClockFrameTimestamp_ = wallClockFrameTimestamp;
> +
> +       status.ready = syncReady_;
> +       status.timerValue = timerValue_;
> +       status.timerKnown = timerKnown;
> +       imageMetadata->set("sync.status", status);
> +       frameCount_++;
> +}
> +
> +void Sync::reset()
> +{
> +       /* This resets the state so that the synchronisation procedure will start over. */
> +       syncReady_ = false;
> +       frameCount_ = 0;
> +       timerValue_ = 0;
> +       serverFrameCountPeriod_ = 0;
> +       serverReadyTime_ = 0;
> +       clientSeenPacket_ = false;
> +}
> +
> +void Sync::setMode(Mode mode)
> +{
> +       mode_ = mode;
> +
> +       /* Another "sync session" can be started by turning it off and on again. */
> +       if (mode == Mode::Off)
> +               reset();

Should we have a top-level Sync::Reset() API call for this?

Minors aside:

Reviewed-by: Naushir Patuck <naush at raspberrypi.com>

> +}
> +
> +void Sync::setFrameDuration(libcamera::utils::Duration frameDuration)
> +{
> +       frameDuration_ = frameDuration;
> +};
> +
> +void Sync::setReadyFrame(unsigned int frame)
> +{
> +       readyFrame_ = frame;
> +};
> +
> +/* Register algorithm with the system. */
> +static Algorithm *create(Controller *controller)
> +{
> +       return (Algorithm *)new Sync(controller);
> +}
> +static RegisterAlgorithm reg(NAME, &create);
> diff --git a/src/ipa/rpi/controller/rpi/sync.h b/src/ipa/rpi/controller/rpi/sync.h
> new file mode 100644
> index 00000000..d3c79b7a
> --- /dev/null
> +++ b/src/ipa/rpi/controller/rpi/sync.h
> @@ -0,0 +1,68 @@
> +/* SPDX-License-Identifier: BSD-2-Clause */
> +/*
> + * Copyright (C) 2024, Raspberry Pi Ltd
> + *
> + * sync.h - sync algorithm
> + */
> +#pragma once
> +
> +#include <netinet/ip.h>
> +
> +#include "../sync_algorithm.h"
> +
> +namespace RPiController {
> +
> +struct SyncPayload {
> +       /* Frame duration in microseconds. */
> +       uint32_t frameDuration;
> +       /* Server system (kernel) frame timestamp. */
> +       uint64_t systemFrameTimestamp;
> +       /* Server wall clock version of the frame timestamp. */
> +       uint64_t wallClockFrameTimestamp;
> +       /* Server system (kernel) sync time (the time at which frames are marked ready). */
> +       uint64_t systemReadyTime;
> +       /* Server wall clock version of the sync time. */
> +       uint64_t wallClockReadyTime;
> +};
> +
> +class Sync : public SyncAlgorithm
> +{
> +public:
> +       Sync(Controller *controller);
> +       ~Sync();
> +       char const *name() const override;
> +       int read(const libcamera::YamlObject &params) override;
> +       void setMode(Mode mode) override;
> +       void initialiseSocket();
> +       void switchMode(CameraMode const &cameraMode, Metadata *metadata) override;
> +       void process(StatisticsPtr &stats, Metadata *imageMetadata) override;
> +       void setFrameDuration(libcamera::utils::Duration frameDuration) override;
> +       void setReadyFrame(unsigned int frame) override;
> +
> +private:
> +       void reset(); /* reset internal state and start over */
> +
> +       Mode mode_; /* server or client */
> +       std::string group_; /* IP group address for sync messages */
> +       uint16_t port_; /* port number for messages */
> +       uint32_t syncPeriod_; /* send a sync message every this many frames */
> +       uint32_t readyFrame_; /* tell the application we're ready after this many frames */
> +       uint32_t minAdjustment_; /* don't adjust the client frame length by less than this */
> +
> +       struct sockaddr_in addr_;
> +       int socket_ = -1;
> +       libcamera::utils::Duration frameDuration_;
> +       unsigned int frameCount_;
> +       bool syncReady_;
> +       int64_t timerValue_ = 0; /* time until "ready time" */
> +
> +       double frameDurationEstimated_ = 0; /* estimate the true frame duration of the sensor */
> +       uint64_t lastWallClockFrameTimestamp_; /* wall clock timestamp of previous frame */
> +
> +       uint32_t serverFrameCountPeriod_ = 0; /* send the next packet when this reaches syncPeriod_ */
> +
> +       bool clientSeenPacket_ = false; /* whether the client has received a packet yet */
> +       uint64_t serverReadyTime_ = 0; /* the client's latest value for when the server will be "ready" */
> +};
> +
> +} /* namespace RPiController */
> --
> 2.39.5
>


More information about the libcamera-devel mailing list