[libcamera-devel] [PATCH v4 10/37] libcamera: Add IPAIPC implementation based on unix socket
Laurent Pinchart
laurent.pinchart at ideasonboard.com
Fri Nov 20 15:50:15 CET 2020
Hi Paul,
Thank you for the patch.
On Thu, Nov 19, 2020 at 03:05:27PM +0900, paul.elder at ideasonboard.com wrote:
> On Wed, Nov 18, 2020 at 10:37:54AM +0100, Jacopo Mondi wrote:
> > On Wed, Nov 18, 2020 at 06:25:50PM +0900, paul.elder at ideasonboard.com wrote:
> > > On Tue, Nov 17, 2020 at 04:42:28PM +0100, Jacopo Mondi wrote:
> > > > On Fri, Nov 06, 2020 at 07:36:40PM +0900, Paul Elder wrote:
> > > > > Add an implementation of IPAIPC using unix socket.
> > > > >
> > > > > Signed-off-by: Paul Elder <paul.elder at ideasonboard.com>
> > > > >
> > > > > ---
> > > > > Changes in v4:
> > > > > - change snake_case to camelCase
> > > > > - change proc_ and socket_ to unique pointers
> > > > > - move inclusion of corresponding header to first in the include list
> > > > > - reserve message data and fds size (for sending)
> > > > >
> > > > > Changes in v3:
> > > > > - remove unused writeUInt32() and readUInt32()
> > > > > - remove redundant definition of IPAIPCUnixSocket::isValid()
> > > > > - remove & 0xff in writeHeader()
> > > > > - make readHeader, writeHeader, and eraseHeader static class functions
> > > > > of IPAIPCUnixSocket instead of globals
> > > > >
> > > > > Changes in v2:
> > > > > - specify in doxygen to skip generating documentation for
> > > > > IPAIPCUnixSocket
> > > > > ---
> > > > > Documentation/Doxyfile.in | 2 +
> > > > > .../libcamera/internal/ipa_ipc_unixsocket.h | 62 +++++
> > > > > src/libcamera/ipa_ipc_unixsocket.cpp | 213 ++++++++++++++++++
> > > > > src/libcamera/meson.build | 1 +
> > > > > 4 files changed, 278 insertions(+)
> > > > > create mode 100644 include/libcamera/internal/ipa_ipc_unixsocket.h
> > > > > create mode 100644 src/libcamera/ipa_ipc_unixsocket.cpp
> > > > >
> > > > > diff --git a/Documentation/Doxyfile.in b/Documentation/Doxyfile.in
> > > > > index a6754a47..20fa1349 100644
> > > > > --- a/Documentation/Doxyfile.in
> > > > > +++ b/Documentation/Doxyfile.in
> > > > > @@ -837,8 +837,10 @@ RECURSIVE = YES
> > > > > EXCLUDE = @TOP_SRCDIR@/include/libcamera/span.h \
> > > > > @TOP_SRCDIR@/include/libcamera/internal/device_enumerator_sysfs.h \
> > > > > @TOP_SRCDIR@/include/libcamera/internal/device_enumerator_udev.h \
> > > > > + @TOP_SRCDIR@/include/libcamera/internal/ipa_ipc_unixsocket.h \
> > > > > @TOP_SRCDIR@/src/libcamera/device_enumerator_sysfs.cpp \
> > > > > @TOP_SRCDIR@/src/libcamera/device_enumerator_udev.cpp \
> > > > > + @TOP_SRCDIR@/src/libcamera/ipa_ipc_unixsocket.cpp \
> > > > > @TOP_SRCDIR@/src/libcamera/pipeline/ \
> > > > > @TOP_SRCDIR@/src/libcamera/proxy/ \
> > > > > @TOP_SRCDIR@/src/libcamera/tracepoints.cpp \
> > > > > diff --git a/include/libcamera/internal/ipa_ipc_unixsocket.h b/include/libcamera/internal/ipa_ipc_unixsocket.h
> > > > > new file mode 100644
> > > > > index 00000000..f7248ca0
> > > > > --- /dev/null
> > > > > +++ b/include/libcamera/internal/ipa_ipc_unixsocket.h
> > > > > @@ -0,0 +1,62 @@
> > > > > +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> > > > > +/*
> > > > > + * Copyright (C) 2020, Google Inc.
> > > > > + *
> > > > > + * ipa_ipc_unixsocket.h - Image Processing Algorithm IPC module using unix socket
> > > > > + */
> > > > > +#ifndef __LIBCAMERA_INTERNAL_IPA_IPC_UNIXSOCKET_H__
> > > > > +#define __LIBCAMERA_INTERNAL_IPA_IPC_UNIXSOCKET_H__
> > > > > +
> > > > > +#include <vector>
You need <map>, <memory> and <stdint.h> too.
> > > > > +
> > > > > +#include <libcamera/span.h>
This is not needed.
> > > > > +
> > > > > +#include "libcamera/internal/ipa_ipc.h"
> > > > > +#include "libcamera/internal/ipa_module.h"
Neither is ipa_module.h (you will need it in the .cpp file though).
> > > > > +#include "libcamera/internal/ipc_unixsocket.h"
> > > > > +
> > > > > +namespace libcamera {
> > > > > +
> > > > > +class Process;
> > > > > +
> > > > > +class IPAIPCUnixSocket : public IPAIPC
> > > > > +{
> > > > > +public:
> > > > > + IPAIPCUnixSocket(const char *ipaModulePath, const char *ipaProxyWorkerPath);
> > > > > + ~IPAIPCUnixSocket();
> > > > > +
> > > > > + int sendSync(uint32_t cmd,
> > > > > + const std::vector<uint8_t> &dataIn,
> > > > > + const std::vector<int32_t> &fdsIn,
> > > > > + std::vector<uint8_t> *dataOut = nullptr,
> > > > > + std::vector<int32_t> *fdsOut = nullptr) override;
> > > > > +
> > > > > + int sendAsync(uint32_t cmd,
> > > > > + const std::vector<uint8_t> &dataIn,
> > > > > + const std::vector<int32_t> &fdsIn) override;
> > > > > +
> > > > > + static void writeHeader(IPCUnixSocket::Payload &payload, uint32_t cmd, uint32_t seq);
> > > > > + static std::tuple<uint32_t, uint32_t> readHeader(IPCUnixSocket::Payload &payload);
> > > > > + static void eraseHeader(IPCUnixSocket::Payload &payload);
> > > >
> > > > There's one thing I don't fully get yet.
> > > >
> > > > sendSycn/Async are methods of the interface, whatever IPC mechanism is
> > > > used, the generated classes that call those methods are guaranteed to
> > > > be independent from it.
> > > >
> > > > writeHeader and readHeader are specific to the IPCUnixSocket
> > > > implementation, and I see the generated worker using them and using
> > > > explicit calls to, for example, socket_.send() which is not IPC
> > > > mechanism agnostic.
> > > >
> > > > Is this by-design ? The workers are meant to know which IPC mechamism
> > > > is in use ?
> > >
> > > Yeah... the worker is not /supposed/ to know which IPC mechanism is in
> > > use. I only realized after I finished everything, and Laurent agreed
> > > that we could wait until we get a new IPC mechanism to fix it. Ideally
> > > there should be an IPAIPC worker of some sort, another layer in between
> > > the main IPAIPC and the proxy worker.
> >
> > Oh I see.. as long as you and Laurent have validated this I'm fine.
> >
> > I now wonder if writeHeader/readHeader are not best added to the
> > IPCUnixSocket component and left out from the IPAIPCUnixSocket.
> >
> > The reason is that workers do not use the IPAIPC interface if not for
> > these functions, but they use IPCUnixSocket to communicate back with
> > the Proxy. Removing any dependency from IPAIPC from worker will make
> > it easier to isolate in a new interface the interactions with the IPC
> > mechanism in future. The IPAIPCUnixSocket will then use them from the
> > IPCUnixSocket library. If another IPC mechanism is in use, maybe a
> > different header format will be required, and IPAIPC and Workers will
> > use it from the new IPCSomething library.
> >
> > After all, writeHeader/readHeader have nothing IPA specific, they just
> > (de)serialize two integers in a buffer.
>
> When it comes time to supporting swapping out IPC mechanisms, we'll need
> an IPAIPC worker to receive the IPC calls and unwrap the serialized
> data, and call into the Proxy worker.
>
> So like... the IPAIPC (for UnixSocket) worker would have readyRead, and
> then unpack the header (and other IPC-specific things), and then call
> into a catch-all in the proxy worker while passing the serialized data,
> which would switch-case and deserialize and call the IPA.
>
> So the current proxy worker still has to be torn in half anyway.
>
> But I think you're right, neither the proxy worker nor the future IPAIPC
> worker actually need the IPAIPCUnixSocket. The IPAIPC worker would use
> IPCUnixSocket, and the proxy worker... nothing.
Following the review comments on "[PATCH v4 09/37] libcamera: Add
IPAIPC", and the proposed introduction of an IPCMessage class, how about
moving those three methods to IPCMessage ? They should actually then
become internal, as IPCMessage would be constructed from a payload. The
IPCUnixSocket::Payload is specified to IPCUnixSocket so that's a bit of
an issue, but we could move it out to ipa_ipc.h and name it IPCPayload.
IPCMessage would be constructed from an IPCPayload on the receive path,
and would create a payload on the send path (with an
IPCMessage::payload()) method.
> > > > > +
> > > > > +private:
> > > > > + struct CallData {
> > > > > + IPCUnixSocket::Payload *response;
> > > > > + bool done;
> > > > > + };
> > > > > +
> > > > > + void readyRead(IPCUnixSocket *socket);
> > > > > + int call(const IPCUnixSocket::Payload &message, IPCUnixSocket::Payload *response, uint32_t seq);
> > > > > +
> > > > > + uint32_t seq_;
> > > > > +
> > > > > + std::unique_ptr<Process> proc_;
> > > > > +
> > > > > + std::unique_ptr<IPCUnixSocket> socket_;
> > > > > +
> > > > > + std::map<uint32_t, struct CallData> callData_;
s/struct //
You could drop blank lines.
> > > > > +};
> > > > > +
> > > > > +} /* namespace libcamera */
> > > > > +
> > > > > +#endif /* __LIBCAMERA_INTERNAL_IPA_IPC_UNIXSOCKET_H__ */
> > > > > diff --git a/src/libcamera/ipa_ipc_unixsocket.cpp b/src/libcamera/ipa_ipc_unixsocket.cpp
> > > > > new file mode 100644
> > > > > index 00000000..eebb39fd
> > > > > --- /dev/null
> > > > > +++ b/src/libcamera/ipa_ipc_unixsocket.cpp
> > > > > @@ -0,0 +1,213 @@
> > > > > +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> > > > > +/*
> > > > > + * Copyright (C) 2020, Google Inc.
> > > > > + *
> > > > > + * ipa_ipc_unixsocket.cpp - Image Processing Algorithm IPC module using unix socket
> > > > > + */
> > > > > +
> > > > > +#include "libcamera/internal/ipa_ipc_unixsocket.h"
> > > > > +
> > > > > +#include <vector>
> > > > > +
> > > > > +#include "libcamera/internal/ipa_ipc.h"
> > > > > +#include "libcamera/internal/ipc_unixsocket.h"
> > > > > +#include "libcamera/internal/log.h"
> > > > > +#include "libcamera/internal/process.h"
> > > > > +#include "libcamera/internal/thread.h"
> > > > > +
> > > > > +#include <libcamera/event_dispatcher.h>
> > > > > +#include <libcamera/timer.h>
> > > > > +
> > > > > +namespace libcamera {
> > > > > +
> > > > > +LOG_DECLARE_CATEGORY(IPAIPC)
> > > > > +
> > > > > +IPAIPCUnixSocket::IPAIPCUnixSocket(const char *ipaModulePath,
> > > > > + const char *ipaProxyWorkerPath)
> > > > > + : IPAIPC(), seq_(0),
> > > > > + proc_(nullptr), socket_(nullptr)
> > > > > +{
> > > > > + std::vector<int> fds;
> > > > > + std::vector<std::string> args;
> > > > > + args.push_back(ipaModulePath);
> > > > > +
> > > > > + socket_ = std::make_unique<IPCUnixSocket>();
> > > > > + int fd = socket_->create();
> > > > > + if (fd < 0) {
> > > > > + LOG(IPAIPC, Error) << "Failed to create socket";
> > > > > + return;
> > > > > + }
> > > > > + socket_->readyRead.connect(this, &IPAIPCUnixSocket::readyRead);
> > > > > + args.push_back(std::to_string(fd));
> > > > > + fds.push_back(fd);
> > > > > +
> > > > > + proc_ = std::make_unique<Process>();
> > > > > + int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> > > > > + if (ret) {
> > > > > + LOG(IPAIPC, Error)
> > > > > + << "Failed to start proxy worker process";
> > > > > + return;
> > > > > + }
> > > > > +
> > > > > + valid_ = true;
> > > > > +}
> > > > > +
> > > > > +IPAIPCUnixSocket::~IPAIPCUnixSocket()
> > > > > +{
> > > > > +}
You can drop the destructor if it's empty.
> > > > > +
> > > > > +int IPAIPCUnixSocket::sendSync(uint32_t cmd,
> > > > > + const std::vector<uint8_t> &dataIn,
> > > > > + const std::vector<int32_t> &fdsIn,
> > > > > + std::vector<uint8_t> *dataOut,
> > > > > + std::vector<int32_t> *fdsOut)
> > > > > +{
> > > > > + IPCUnixSocket::Payload message, response;
> > > > > + int ret;
> > > > > +
> > > > > + message.data.reserve(8 + dataIn.size());
> > > > > + message.fds.reserve(fdsIn.size());
> > > > > +
> > > > > + /* It's fine if seq_ overflows; that'll just be the new epoch. */
> > > > > + seq_++;
> > > > > + writeHeader(message, cmd, seq_);
> > > > > + message.data.insert(message.data.end(), dataIn.begin(), dataIn.end());
> > > > > +
> > > > > + message.fds = const_cast<std::vector<int32_t> &>(fdsIn);
> > > > > +
> > > > > + ret = call(message, &response, seq_);
> > > > > + if (ret) {
> > > > > + LOG(IPAIPC, Error) << "Failed to call sync";
> > > > > + callData_.erase(seq_);
> > > > > + return ret;
> > > > > + }
> > > > > +
> > > > > + if (dataOut)
> > > > > + dataOut->insert(dataOut->end(), response.data.begin(), response.data.end());
> > > > > +
> > > > > + if (fdsOut)
> > > > > + fdsOut->insert(fdsOut->end(), response.fds.begin(), response.fds.end());
> > > > > +
> > > > > + return 0;
> > > > > +}
> > > > > +
> > > > > +int IPAIPCUnixSocket::sendAsync(uint32_t cmd,
> > > > > + const std::vector<uint8_t> &dataIn,
> > > > > + const std::vector<int32_t> &fdsIn)
> > > > > +{
> > > > > + IPCUnixSocket::Payload message;
> > > > > + int ret;
> > > > > +
> > > > > + message.data.reserve(8 + dataIn.size());
> > > > > + message.fds.reserve(fdsIn.size());
> > > > > +
> > > > > + writeHeader(message, cmd, 0);
> > > > > + message.data.insert(message.data.end(), dataIn.begin(), dataIn.end());
> > > > > +
> > > > > + message.fds = const_cast<std::vector<int32_t> &>(fdsIn);
> > > > > +
> > > > > + ret = socket_->send(message);
> > > > > + if (ret) {
> > > > > + LOG(IPAIPC, Error) << "Failed to call async";
> > > > > + return ret;
> > > > > + }
> > > > > +
> > > > > + return 0;
> > > > > +}
There's similar code in the two functions, merging sendSync() and
sendAsync() with flags to select sync or async behaviour would I think
lower code duplication.
> > > > > +
> > > > > +void IPAIPCUnixSocket::writeHeader(IPCUnixSocket::Payload &payload,
> > > > > + uint32_t cmd, uint32_t seq)
> > > > > +{
> > > > > + uint8_t cmd_arr[] = {static_cast<uint8_t>(cmd),
> > > > > + static_cast<uint8_t>((cmd >> 8)),
> > > > > + static_cast<uint8_t>((cmd >> 16)),
> > > > > + static_cast<uint8_t>((cmd >> 24))};
> > > > > + uint8_t seq_arr[] = {static_cast<uint8_t>(seq),
> > > > > + static_cast<uint8_t>((seq >> 8)),
> > > > > + static_cast<uint8_t>((seq >> 16)),
> > > > > + static_cast<uint8_t>((seq >> 24))};
> > > > > + payload.data.insert(payload.data.begin(), cmd_arr, cmd_arr+4);
> > > > > + payload.data.insert(payload.data.begin() + 4, seq_arr, seq_arr+4);
How about creating a structure to define the header ?
struct IPCMessageHeader {
uint32_t cmd;
uint32_t seq;
};
IPCMessageHeader *header;
payload.data.insert(payload.data.begin(), sizeof(*header), 0);
new (payload.data.data()) IPCMessageHeader{ cmd, seq };
The last line is referred as "placement new"
(https://en.cppreference.com/w/cpp/language/new#Placement_new) and
constructs an object in allocated storage.
I'm tempted to rename cmd to type, name or id, as it identifies the type
of message. I would also rename seq to cookie (or requestId or ...), as
it's meant to match requests with replies, and the protocol thus doesn't
need to depend on the value being a sequence number (the implementation
can of course use a sequence number).
> > > > > +}
> > > > > +
> > > > > +std::tuple<uint32_t, uint32_t>
> > > > > +IPAIPCUnixSocket::readHeader(IPCUnixSocket::Payload &payload)
> > > > > +{
> > > > > + uint32_t cmd = payload.data[0] |
> > > > > + (payload.data[1] << 8) |
> > > > > + (payload.data[2] << 16) |
> > > > > + (payload.data[3] << 24);
> > > > > + uint32_t seq = payload.data[4] |
> > > > > + (payload.data[5] << 8) |
> > > > > + (payload.data[6] << 16) |
> > > > > + (payload.data[7] << 24);
> > > > > +
> > > > > + return {cmd, seq};
> > > > > +}
We could do something similar here.
> > > > > +
> > > > > +void IPAIPCUnixSocket::eraseHeader(IPCUnixSocket::Payload &payload)
> > > > > +{
> > > > > + payload.data.erase(payload.data.begin(), payload.data.begin() + 8);
> > > > > +}
Could we avoid the need to call eraseHeader() by using Span and/or
iterators appropriately ? Dropping 8 bytes at the beginning of the
message is a fairly expensive operation.
> > > > > +
> > > > > +void IPAIPCUnixSocket::readyRead(IPCUnixSocket *socket)
> > > > > +{
> > > > > + IPCUnixSocket::Payload message;
> > > > > + int ret = socket->receive(&message);
> > > > > + if (ret) {
> > > > > + LOG(IPAIPC, Error) << "Receive message failed" << ret;
> > > > > + return;
> > > > > + }
> > > > > +
> > > > > + uint32_t cmd, seq;
> > > > > + std::tie(cmd, seq) = readHeader(message);
> > > > > +
> > > > > + auto callData = callData_.find(seq);
Events sent by the IPA have a sequence number hardcoded to 0 in the
proxy worker. This means that when the sequence number wraps around in
IPAIPCUnixSocket::sendSync(), there's a risk an event would be
incorrectly interpreted as a response.
You could consider 0 as a reserved value for sequence numbers (and thus
skip it in IPAIPCUnixSocket::sendSync()), or add flags to the message
header to indicate that the message is a response. I think flags would
make sense, we may need more flags later.
> > > > > + if (callData != callData_.end()) {
> > > > > + eraseHeader(message);
> > > > > + /* Is there any way to avoid this copy? */
> > > > > + *callData->second.response = message;
= std::move(message);
(making sure that Payload has appropriate move constructor and
assignment operator, either explicit, or implicit).
> > > > > + callData->second.done = true;
> > > > > + return;
> > > > > + }
> > > > > +
> > > > > + /*
> > > > > + * Received unexpected data, this means it's a call from the IPA.
> > > > > + * We can't return anything to the IPA (gotta keep them under *our*
> > > > > + * control, plus returning would require blocking the caller, and we
> > > > > + * can't afford to do that). Let the proxy do switch-case on cmd.
I'd drop the last sentence, and possibly even the last two sentences, as
there's nothing requiring this class to be used only by proxies.
> > > > > + */
> > > > > + recvIPC.emit(message.data, message.fds);
> > > > > +
> > > > > + return;
> > > > > +}
> > > > > +
> > > > > +int IPAIPCUnixSocket::call(const IPCUnixSocket::Payload &message, IPCUnixSocket::Payload *response, uint32_t seq)
Line wrap ?
> > > > > +{
> > > > > + Timer timeout;
> > > > > + int ret;
> > > > > +
> > > > > + callData_[seq].response = response;
> > > > > + callData_[seq].done = false;
> > > > > +
> > > > > + ret = socket_->send(message);
> > > > > + if (ret)
> > > > > + return ret;
> > > > > +
> > > > > + timeout.start(200);
> > > > > + while (!callData_[seq].done) {
> > > > > + if (!timeout.isRunning()) {
> > > > > + LOG(IPAIPC, Error) << "Call timeout!";
> > > > > + callData_.erase(seq);
> > > > > + return -ETIMEDOUT;
> > > > > + }
> > > > > +
> > > > > + Thread::current()->eventDispatcher()->processEvents();
The event loop wasn't developed to be reentrant. I think this is a very
valid use case, which means we'll likely need to fix an issue or two in
the event dispatcher to support this properly.
It also means that the pipeline handler could receive another event
while waiting for the call to complete. This part worries me a bit as it
will be hard to implement it properly in pipeline handlers. See
https://docs.google.com/document/d/1dixzFzZQW8e3ldjdM8Adbo8klXDDE4pVekwo5aLgUsE/edit#
for some background information.
The IPA protocols we are envisioning should not mix sync and async
messages : everything is synchronous until we start the IPA, after which
messages are asynchronous until we stop it. This should give us some
safety. I'm not sure if we'll easily be able to enforce that though.
There are other options, such as split the camera manager thread to one
thread per pipeline handler, and only processing IPC events here (we
would need to extend the event dispatcher API to make this possible).
The drawback is that pipeline handlers that handle multiple cameras
would all of a sudden block event processing for all the cameras when a
sync call is made, which isn't nice at all. Moving to one thread per
camera would help, but then pipeline handlers would suddenly become
multi-threaded when they support multiple cameras, which is a completely
different can of worms.
Ideas are welcome :-)
> > > > > + }
> > > > > +
> > > > > + callData_.erase(seq);
> > > > > +
> > > > > + return 0;
> > > > > +}
> > > > > +
> > > > > +} /* namespace libcamera */
> > > > > diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build
> > > > > index 85f3a202..d6bd9a05 100644
> > > > > --- a/src/libcamera/meson.build
> > > > > +++ b/src/libcamera/meson.build
> > > > > @@ -26,6 +26,7 @@ libcamera_sources = files([
> > > > > 'ipa_controls.cpp',
> > > > > 'ipa_data_serializer.cpp',
> > > > > 'ipa_ipc.cpp',
> > > > > + 'ipa_ipc_unixsocket.cpp',
> > > > > 'ipa_interface.cpp',
> > > > > 'ipa_manager.cpp',
> > > > > 'ipa_module.cpp',
--
Regards,
Laurent Pinchart
More information about the libcamera-devel
mailing list