[libcamera-devel] [PATCH v4 3/3] libcamera: ipc: unix: Make socket operation asynchronous
Niklas Söderlund
niklas.soderlund at ragnatech.se
Tue Jul 2 01:36:18 CEST 2019
Hi Laurent,
Thanks for your work.
On 2019-07-02 02:23:39 +0300, Laurent Pinchart wrote:
> Blocking socket operation when receiving messages may lead to long
> delays, and possibly a complete deadlock, if the remote side delays
> sending of the payload after the header, or doesn't send the payload at
> all. To avoid this, make the socket non-blocking and implement a simple
> state machine to receive the header synchronously with the socket read
> notification. The payload read is still synchronous with the receive()
> method to avoid data copies.
>
> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
Reviewed-by: Niklas Söderlund <niklas.soderlund at ragnatech.se>
> ---
> src/libcamera/include/ipc_unixsocket.h | 2 +
> src/libcamera/ipc_unixsocket.cpp | 88 ++++++++++++++++++--------
> 2 files changed, 63 insertions(+), 27 deletions(-)
>
> diff --git a/src/libcamera/include/ipc_unixsocket.h b/src/libcamera/include/ipc_unixsocket.h
> index ef166d742554..03e9fe492bde 100644
> --- a/src/libcamera/include/ipc_unixsocket.h
> +++ b/src/libcamera/include/ipc_unixsocket.h
> @@ -49,6 +49,8 @@ private:
> void dataNotifier(EventNotifier *notifier);
>
> int fd_;
> + bool headerReceived_;
> + struct Header header_;
> EventNotifier *notifier_;
> };
>
> diff --git a/src/libcamera/ipc_unixsocket.cpp b/src/libcamera/ipc_unixsocket.cpp
> index c11f116093c5..def08eef00f8 100644
> --- a/src/libcamera/ipc_unixsocket.cpp
> +++ b/src/libcamera/ipc_unixsocket.cpp
> @@ -7,6 +7,7 @@
>
> #include "ipc_unixsocket.h"
>
> +#include <poll.h>
> #include <string.h>
> #include <sys/socket.h>
> #include <unistd.h>
> @@ -49,10 +50,10 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
> * transporting entire payloads with guaranteed ordering.
> *
> * The IPC design is asynchronous, a message is queued to a receiver which gets
> - * notified that a message is ready to be consumed by a signal. The queuer of
> - * the message gets no notification when a message is delivered nor processed.
> - * If such interactions are needed a protocol specific to the users use-case
> - * should be implemented on top of the IPC objects.
> + * notified that a message is ready to be consumed by the \ref readyRead
> + * signal. The sender of the message gets no notification when a message is
> + * delivered nor processed. If such interactions are needed a protocol specific
> + * to the users use-case should be implemented on top of the IPC objects.
> *
> * Establishment of an IPC channel is asymmetrical. The side that initiates
> * communication first instantiates a local side socket and creates the channel
> @@ -64,7 +65,7 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
> */
>
> IPCUnixSocket::IPCUnixSocket()
> - : fd_(-1), notifier_(nullptr)
> + : fd_(-1), headerReceived_(false), notifier_(nullptr)
> {
> }
>
> @@ -89,7 +90,7 @@ int IPCUnixSocket::create()
> int sockets[2];
> int ret;
>
> - ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets);
> + ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets);
> if (ret) {
> ret = -errno;
> LOG(IPCUnixSocket, Error)
> @@ -142,6 +143,7 @@ void IPCUnixSocket::close()
> ::close(fd_);
>
> fd_ = -1;
> + headerReceived_ = false;
> }
>
> /**
> @@ -193,38 +195,38 @@ int IPCUnixSocket::send(const Payload &payload)
> * \param[out] payload Payload where to write the received message
> *
> * This method receives the message payload from the IPC channel and writes it
> - * to the \a payload. It blocks until one message is received, if an
> - * asynchronous behavior is desired this method should be called when the
> - * readyRead signal is emitted.
> + * to the \a payload. If no message payload is available, it returns
> + * immediately with -EAGAIN. The \ref readyRead signal shall be used to receive
> + * notification of message availability.
> *
> * \todo Add state machine to make sure we don't block forever and that
> * a header is always followed by a payload.
> *
> * \return 0 on success or a negative error code otherwise
> + * \retval -EAGAIN No message payload is available
> + * \retval -ENOTCONN The socket is not connected (neither create() nor bind()
> + * has been called)
> */
> int IPCUnixSocket::receive(Payload *payload)
> {
> - Header hdr;
> - int ret;
> -
> if (!isBound())
> return -ENOTCONN;
>
> - if (!payload)
> - return -EINVAL;
> + if (!headerReceived_)
> + return -EAGAIN;
>
> - ret = ::recv(fd_, &hdr, sizeof(hdr), 0);
> - if (ret < 0) {
> - ret = -errno;
> - LOG(IPCUnixSocket, Error)
> - << "Failed to recv header: " << strerror(-ret);
> + payload->data.resize(header_.data);
> + payload->fds.resize(header_.fds);
> +
> + int ret = recvData(payload->data.data(), header_.data,
> + payload->fds.data(), header_.fds);
> + if (ret < 0)
> return ret;
> - }
>
> - payload->data.resize(hdr.data);
> - payload->fds.resize(hdr.fds);
> + headerReceived_ = false;
> + notifier_->setEnabled(true);
>
> - return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds);
> + return 0;
> }
>
> /**
> @@ -232,7 +234,8 @@ int IPCUnixSocket::receive(Payload *payload)
> * \brief A Signal emitted when a message is ready to be read
> */
>
> -int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num)
> +int IPCUnixSocket::sendData(const void *buffer, size_t length,
> + const int32_t *fds, unsigned int num)
> {
> struct iovec iov[1];
> iov[0].iov_base = const_cast<void *>(buffer);
> @@ -266,7 +269,8 @@ int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd
> return 0;
> }
>
> -int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num)
> +int IPCUnixSocket::recvData(void *buffer, size_t length,
> + int32_t *fds, unsigned int num)
> {
> struct iovec iov[1];
> iov[0].iov_base = buffer;
> @@ -291,8 +295,9 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
>
> if (recvmsg(fd_, &msg, 0) < 0) {
> int ret = -errno;
> - LOG(IPCUnixSocket, Error)
> - << "Failed to recvmsg: " << strerror(-ret);
> + if (ret != -EAGAIN)
> + LOG(IPCUnixSocket, Error)
> + << "Failed to recvmsg: " << strerror(-ret);
> return ret;
> }
>
> @@ -303,6 +308,35 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
>
> void IPCUnixSocket::dataNotifier(EventNotifier *notifier)
> {
> + int ret;
> +
> + if (!headerReceived_) {
> + /* Receive the header. */
> + ret = ::recv(fd_, &header_, sizeof(header_), 0);
> + if (ret < 0) {
> + ret = -errno;
> + LOG(IPCUnixSocket, Error)
> + << "Failed to receive header: " << strerror(-ret);
> + return;
> + }
> +
> + headerReceived_ = true;
> + }
> +
> + /*
> + * If the payload has arrived, disable the notifier and emit the
> + * readyRead signal. The notifier will be reenabled by the receive()
> + * method.
> + */
> + struct pollfd fds = { fd_, POLLIN, 0 };
> + ret = poll(&fds, 1, 0);
> + if (ret < 0)
> + return;
> +
> + if (!(fds.revents & POLLIN))
> + return;
> +
> + notifier_->setEnabled(false);
> readyRead.emit(this);
> }
>
> --
> Regards,
>
> Laurent Pinchart
>
> _______________________________________________
> libcamera-devel mailing list
> libcamera-devel at lists.libcamera.org
> https://lists.libcamera.org/listinfo/libcamera-devel
--
Regards,
Niklas Söderlund
More information about the libcamera-devel
mailing list