[libcamera-devel] [RFC v3 5/5] py: Add cam.py
Laurent Pinchart
laurent.pinchart at ideasonboard.com
Thu Dec 9 20:44:13 CET 2021
On Thu, Dec 09, 2021 at 10:03:35AM +0000, Kieran Bingham wrote:
> Quoting Tomi Valkeinen (2021-12-09 09:29:06)
> > Add cam.py, which mimics the 'cam' tool. Four rendering backends are
> > added:
> >
> > * null - Do nothing
> > * kms - Use KMS with dmabufs
> > * qt - SW render on a Qt window
> > * qtgl - OpenGL render on a Qt window
> >
> > All the renderers handle only a few pixel formats, and especially the GL
> > renderer is just a prototype.
> >
> > Signed-off-by: Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > ---
> > src/py/test/cam.py | 464 ++++++++++++++++++++++++++++++++++++++
> > src/py/test/cam_kms.py | 185 +++++++++++++++
> > src/py/test/cam_null.py | 46 ++++
> > src/py/test/cam_qt.py | 355 +++++++++++++++++++++++++++++
> > src/py/test/cam_qtgl.py | 385 +++++++++++++++++++++++++++++++
> > src/py/test/gl_helpers.py | 67 ++++++
>
> While these are 'test' apps, they're not quite 'tests'.
>
> I think these should live under src/py/pycam? Or some such name?
Looks good to me.
> Essentially they will likely form the basis of 'example python usage'...
>
> I'd quite like to see something (anything?) that counts as a unittest
> integrated into /test/py/ that somehow validates the APIs that are
> created.
Ditto.
> Even if it's only basic for now to construct a camera, or something, so
> that when built/enabled, the python code can be linked into the test
> framework with 'ninja -C build test'.
>
> I wonder if a top-level TODO file under src/py/TODO and/or
> src/py/{test/pycam}/TODO will help clearly mark things that are known
> not to be implemented ?
>
>
> Beyond that, I think this series would benefit from early integration so
> it can be more thoroughly used and devloped rather than trying to
> 'perfect' it out in isolation.
I agree here too. There's a set of small comments that should be
addressed in a v4 for the parts that touch the libcamera C++ API, the
build system or .gitignore, for the Python side we can then work on top
of this series after merging it.
> > 6 files changed, 1502 insertions(+)
> > create mode 100755 src/py/test/cam.py
> > create mode 100644 src/py/test/cam_kms.py
> > create mode 100644 src/py/test/cam_null.py
> > create mode 100644 src/py/test/cam_qt.py
> > create mode 100644 src/py/test/cam_qtgl.py
> > create mode 100644 src/py/test/gl_helpers.py
> >
> > diff --git a/src/py/test/cam.py b/src/py/test/cam.py
> > new file mode 100755
> > index 00000000..48df01cf
> > --- /dev/null
> > +++ b/src/py/test/cam.py
> > @@ -0,0 +1,464 @@
> > +#!/usr/bin/python3
Isn't
#!/usr/bin/env python3
preferred, to support custom Python installation ?
> > +# SPDX-License-Identifier: GPL-2.0-or-later
> > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > +
> > +import pycamera as pycam
> > +import time
> > +import binascii
> > +import argparse
> > +import selectors
> > +import os
> > +import sys
I think Python too knows about alphabetical ordering ;-)
> > +
> > +class CustomCameraAction(argparse.Action):
> > + def __call__(self, parser, namespace, values, option_string=None):
> > + print(self.dest, values)
> > +
> > + if not "camera" in namespace or namespace.camera == None:
> > + setattr(namespace, "camera", [])
> > +
> > + previous = namespace.camera
> > + previous.append((self.dest, values))
> > + setattr(namespace, "camera", previous)
> > +
> > +class CustomAction(argparse.Action):
> > + def __init__(self, option_strings, dest, **kwargs):
> > + super().__init__(option_strings, dest, default={}, **kwargs)
> > +
> > + def __call__(self, parser, namespace, values, option_string=None):
> > + if len(namespace.camera) == 0:
> > + print(f"Option {option_string} requires a --camera context")
> > + sys.exit(-1)
> > +
> > + if self.type == bool:
> > + values = True
> > +
> > + current = namespace.camera[-1]
> > +
> > + data = getattr(namespace, self.dest)
> > +
> > + if self.nargs == "+":
> > + if not current in data:
> > + data[current] = []
> > +
> > + data[current] += values
> > + else:
> > + data[current] = values
> > +
> > +
> > +
> > +def do_cmd_list(cm):
> > + print("Available cameras:")
> > +
> > + for idx,c in enumerate(cm.cameras):
> > + print(f"{idx + 1}: {c.id}")
> > +
It looks like lots of the functions below that take a context argument
could be moved to a context class.
> > +def do_cmd_list_props(ctx):
> > + camera = ctx["camera"]
> > +
> > + print("Properties for", ctx["id"])
> > +
> > + for name, prop in camera.properties.items():
> > + print("\t{}: {}".format(name, prop))
> > +
> > +def do_cmd_list_controls(ctx):
> > + camera = ctx["camera"]
> > +
> > + print("Controls for", ctx["id"])
> > +
> > + for name, prop in camera.controls.items():
> > + print("\t{}: {}".format(name, prop))
> > +
> > +def do_cmd_info(ctx):
> > + camera = ctx["camera"]
> > +
> > + print("Stream info for", ctx["id"])
> > +
> > + roles = [pycam.StreamRole.Viewfinder]
> > +
> > + camconfig = camera.generateConfiguration(roles)
> > + if camconfig == None:
> > + raise Exception("Generating config failed")
> > +
> > + for i, stream_config in enumerate(camconfig):
> > + print("\t{}: {}".format(i, stream_config.toString()))
> > +
> > + formats = stream_config.formats
> > + for fmt in formats.pixelFormats:
> > + print("\t * Pixelformat:", fmt, formats.range(fmt))
> > +
> > + for size in formats.sizes(fmt):
> > + print("\t -", size)
> > +
> > +def acquire(ctx):
> > + camera = ctx["camera"]
> > +
> > + camera.acquire()
> > +
> > +def release(ctx):
> > + camera = ctx["camera"]
> > +
> > + camera.release()
> > +
> > +def parse_streams(ctx):
> > + streams = []
> > +
> > + for stream_desc in ctx["opt-stream"]:
> > + stream_opts = {"role": pycam.StreamRole.Viewfinder}
> > +
> > + for stream_opt in stream_desc.split(","):
> > + if stream_opt == 0:
> > + continue
> > +
> > + arr = stream_opt.split("=")
> > + if len(arr) != 2:
> > + print("Bad stream option", stream_opt)
> > + sys.exit(-1)
> > +
> > + key = arr[0]
> > + value = arr[1]
> > +
> > + if key in ["width", "height"]:
> > + value = int(value)
> > + elif key == "role":
> > + rolemap = {
> > + "still": pycam.StreamRole.StillCapture,
> > + "raw": pycam.StreamRole.Raw,
> > + "video": pycam.StreamRole.VideoRecording,
> > + "viewfinder": pycam.StreamRole.Viewfinder,
> > + }
> > +
> > + role = rolemap.get(value.lower(), None)
> > +
> > + if role == None:
> > + print("Bad stream role", value)
> > + sys.exit(-1)
> > +
> > + value = role
> > + elif key == "pixelformat":
> > + pass
> > + else:
> > + print("Bad stream option key", key)
> > + sys.exit(-1)
> > +
> > + stream_opts[key] = value
> > +
> > + streams.append(stream_opts)
> > +
> > + return streams
> > +
> > +def configure(ctx):
> > + camera = ctx["camera"]
> > +
> > + streams = parse_streams(ctx)
> > +
> > + roles = [opts["role"] for opts in streams]
> > +
> > + camconfig = camera.generateConfiguration(roles)
> > + if camconfig == None:
> > + raise Exception("Generating config failed")
> > +
> > + for idx,stream_opts in enumerate(streams):
> > + stream_config = camconfig.at(idx)
> > +
> > + if "width" in stream_opts and "height" in stream_opts:
> > + stream_config.size = (stream_opts["width"], stream_opts["height"])
> > +
> > + if "pixelformat" in stream_opts:
> > + stream_config.fmt = stream_opts["pixelformat"]
> > +
> > + stat = camconfig.validate()
> > +
> > + if stat == pycam.ConfigurationStatus.Invalid:
> > + print("Camera configuration invalid")
> > + exit(-1)
> > + elif stat == pycam.ConfigurationStatus.Adjusted:
> > + if ctx["opt-strict-formats"]:
> > + print("Adjusting camera configuration disallowed by --strict-formats argument")
> > + exit(-1)
> > +
> > + print("Camera configuration adjusted")
> > +
> > + r = camera.configure(camconfig);
> > + if r != 0:
> > + raise Exception("Configure failed")
> > +
> > + ctx["stream-names"] = {}
> > + ctx["streams"] = []
> > +
> > + for idx, stream_config in enumerate(camconfig):
> > + stream = stream_config.stream
> > + ctx["streams"].append(stream)
> > + ctx["stream-names"][stream] = "stream" + str(idx)
> > + print("{}-{}: stream config {}".format(ctx["id"], ctx["stream-names"][stream], stream.configuration.toString()))
> > +
> > +def alloc_buffers(ctx):
> > + camera = ctx["camera"]
> > +
> > + allocator = pycam.FrameBufferAllocator(camera);
> > +
> > + for idx, stream in enumerate(ctx["streams"]):
> > + ret = allocator.allocate(stream)
> > + if ret < 0:
> > + print("Can't allocate buffers")
> > + exit(-1)
> > +
> > + allocated = len(allocator.buffers(stream))
> > +
> > + print("{}-{}: Allocated {} buffers".format(ctx["id"], ctx["stream-names"][stream], allocated))
> > +
> > + ctx["allocator"] = allocator
> > +
> > +def create_requests(ctx):
> > + camera = ctx["camera"]
> > +
> > + ctx["requests"] = []
> > +
> > + # Identify the stream with the least number of buffers
> > + num_bufs = min([len(ctx["allocator"].buffers(stream)) for stream in ctx["streams"]])
> > +
> > + requests = []
> > +
> > + for buf_num in range(num_bufs):
> > + request = camera.createRequest()
> > +
> > + if request == None:
> > + print("Can't create request")
> > + exit(-1)
> > +
> > + for stream in ctx["streams"]:
> > + buffers = ctx["allocator"].buffers(stream)
> > + buffer = buffers[buf_num]
> > +
> > + ret = request.addBuffer(stream, buffer)
> > + if ret < 0:
> > + print("Can't set buffer for request")
> > + exit(-1)
> > +
> > + requests.append(request)
> > +
> > + ctx["requests"] = requests
> > +
> > +def start(ctx):
> > + camera = ctx["camera"]
> > +
> > + camera.start()
> > +
> > +def stop(ctx):
> > + camera = ctx["camera"]
> > +
> > + camera.stop()
> > +
> > +def queue_requests(ctx):
> > + camera = ctx["camera"]
> > +
> > + for request in ctx["requests"]:
> > + camera.queueRequest(request)
> > + ctx["reqs-queued"] += 1
> > +
> > + del ctx["requests"]
> > +
> > +def capture_init(contexts):
> > + for ctx in contexts:
> > + acquire(ctx)
> > +
> > + for ctx in contexts:
> > + configure(ctx)
> > +
> > + for ctx in contexts:
> > + alloc_buffers(ctx)
> > +
> > + for ctx in contexts:
> > + create_requests(ctx)
> > +
> > +def capture_start(contexts):
> > + for ctx in contexts:
> > + start(ctx)
> > +
> > + for ctx in contexts:
> > + queue_requests(ctx)
> > +
> > +# Called from renderer when there is a libcamera event
> > +def event_handler(state):
> > + cm = state["cm"]
> > + contexts = state["contexts"]
> > +
> > + data = os.read(cm.efd, 8)
> > +
> > + reqs = cm.getReadyRequests()
> > +
> > + for req in reqs:
> > + ctx = next(ctx for ctx in contexts if ctx["camera"] == req.camera)
> > + request_handler(state, ctx, req)
> > +
> > + running = any(ctx["reqs-completed"] < ctx["opt-capture"] for ctx in contexts)
> > + return running
> > +
> > +def request_handler(state, ctx, req):
> > + camera = ctx["camera"]
> > +
> > + if req.status != pycam.RequestStatus.Complete:
> > + raise Exception("{}: Request failed: {}".format(ctx["id"], req.status))
> > +
> > + buffers = req.buffers
> > +
> > + # Compute the frame rate. The timestamp is arbitrarily retrieved from
> > + # the first buffer, as all buffers should have matching timestamps.
> > + ts = buffers[next(iter(buffers))].metadata.timestamp
> > + last = ctx.get("last", 0)
> > + fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0
> > + ctx["last"] = ts
> > + ctx["fps"] = fps
> > +
> > + for stream, fb in buffers.items():
> > + stream_name = ctx["stream-names"][stream]
> > +
> > + crcs = []
> > + if ctx["opt-crc"]:
> > + with fb.mmap(0) as b:
> > + crc = binascii.crc32(b)
> > + crcs.append(crc)
> > +
> > + meta = fb.metadata
> > +
> > + print("{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}"
> > + .format(ts / 1000000000, fps,
> > + ctx["id"], stream_name,
> > + meta.sequence, meta.bytesused,
> > + crcs))
> > +
> > + if ctx["opt-metadata"]:
> > + reqmeta = req.metadata
> > + for ctrl, val in reqmeta.items():
> > + print(f"\t{ctrl} = {val}")
> > +
> > + if ctx["opt-save-frames"]:
> > + with fb.mmap(0) as b:
> > + filename = "frame-{}-{}-{}.data".format(ctx["id"], stream_name, ctx["reqs-completed"])
> > + with open(filename, "wb") as f:
> > + f.write(b)
> > +
> > + state["renderer"].request_handler(ctx, req);
> > +
> > + ctx["reqs-completed"] += 1
> > +
> > +# Called from renderer when it has finished with a request
> > +def request_prcessed(ctx, req):
> > + camera = ctx["camera"]
> > +
> > + if ctx["reqs-queued"] < ctx["opt-capture"]:
> > + req.reuse()
> > + camera.queueRequest(req)
> > + ctx["reqs-queued"] += 1
> > +
> > +def capture_deinit(contexts):
> > + for ctx in contexts:
> > + stop(ctx)
> > +
> > + for ctx in contexts:
> > + release(ctx)
> > +
> > +def do_cmd_capture(state):
> > + capture_init(state["contexts"])
> > +
> > + renderer = state["renderer"]
> > +
> > + renderer.setup()
> > +
> > + capture_start(state["contexts"])
> > +
> > + renderer.run()
> > +
> > + capture_deinit(state["contexts"])
> > +
> > +def main():
> > + parser = argparse.ArgumentParser()
> > + # global options
> > + parser.add_argument("-l", "--list", action="store_true", help="List all cameras")
> > + parser.add_argument("-c", "--camera", type=int, action="extend", nargs=1, default=[], help="Specify which camera to operate on, by index")
> > + parser.add_argument("-p", "--list-properties", action="store_true", help="List cameras properties")
> > + parser.add_argument("--list-controls", action="store_true", help="List cameras controls")
> > + parser.add_argument("-I", "--info", action="store_true", help="Display information about stream(s)")
> > + parser.add_argument("-R", "--renderer", default="null", help="Renderer (null, kms, qt, qtgl)")
> > +
> > + # per camera options
> > + parser.add_argument("-C", "--capture", nargs="?", type=int, const=1000000, action=CustomAction, help="Capture until interrupted by user or until CAPTURE frames captured")
> > + parser.add_argument("--crc", nargs=0, type=bool, action=CustomAction, help="Print CRC32 for captured frames")
> > + parser.add_argument("--save-frames", nargs=0, type=bool, action=CustomAction, help="Save captured frames to files")
> > + parser.add_argument("--metadata", nargs=0, type=bool, action=CustomAction, help="Print the metadata for completed requests")
> > + parser.add_argument("--strict-formats", type=bool, nargs=0, action=CustomAction, help="Do not allow requested stream format(s) to be adjusted")
> > + parser.add_argument("-s", "--stream", nargs="+", action=CustomAction)
> > + args = parser.parse_args()
> > +
> > + cm = pycam.CameraManager.singleton()
> > +
> > + if args.list:
> > + do_cmd_list(cm)
> > +
> > + contexts = []
> > +
> > + for cam_idx in args.camera:
> > + camera = next((c for i,c in enumerate(cm.cameras) if i + 1 == cam_idx), None)
> > +
> > + if camera == None:
> > + print("Unable to find camera", cam_idx)
> > + return -1
> > +
> > + contexts.append({
> > + "camera": camera,
> > + "idx": cam_idx,
> > + "id": "cam" + str(cam_idx),
> > + "reqs-queued": 0,
> > + "reqs-completed": 0,
> > + "opt-capture": args.capture.get(cam_idx, False),
> > + "opt-crc": args.crc.get(cam_idx, False),
> > + "opt-save-frames": args.save_frames.get(cam_idx, False),
> > + "opt-metadata": args.metadata.get(cam_idx, False),
> > + "opt-strict-formats": args.strict_formats.get(cam_idx, False),
> > + "opt-stream": args.stream.get(cam_idx, ["role=viewfinder"]),
> > + })
Yes, this definitely looks like a candidate for a class :-)
> > +
> > + for ctx in contexts:
> > + print("Using camera {} as {}".format(ctx["camera"].id, ctx["id"]))
> > +
> > + for ctx in contexts:
> > + if args.list_properties:
> > + do_cmd_list_props(ctx)
> > + if args.list_controls:
> > + do_cmd_list_controls(ctx)
> > + if args.info:
> > + do_cmd_info(ctx)
> > +
> > + if args.capture:
> > +
> > + state = {
> > + "cm": cm,
> > + "contexts": contexts,
> > + "event_handler": event_handler,
> > + "request_prcessed": request_prcessed,
> > + }
> > +
> > + if args.renderer == "null":
> > + import cam_null
> > + renderer = cam_null.NullRenderer(state)
> > + elif args.renderer == "kms":
> > + import cam_kms
> > + renderer = cam_kms.KMSRenderer(state)
> > + elif args.renderer == "qt":
> > + import cam_qt
> > + renderer = cam_qt.QtRenderer(state)
Interesting, I would probably instinctively split this application in a
command line version and a Qt version, but I suppose this makes sense
too, as Python allows runtime linking.
> > + elif args.renderer == "qtgl":
> > + import cam_qtgl
> > + renderer = cam_qtgl.QtRenderer(state)
> > + else:
> > + print("Bad renderer", args.renderer)
> > + return -1
> > +
> > + state["renderer"] = renderer
> > +
> > + do_cmd_capture(state)
> > +
> > + return 0
> > +
> > +if __name__ == "__main__":
> > + sys.exit(main())
An equivalent of simple-cam in Python would be useful too, cam.py is
fairly large for a tutorial. This can be done later (even though I would
have imagined it would be the first application to be developed, as it
would be simpler).
> > diff --git a/src/py/test/cam_kms.py b/src/py/test/cam_kms.py
> > new file mode 100644
> > index 00000000..fb0e6375
> > --- /dev/null
> > +++ b/src/py/test/cam_kms.py
> > @@ -0,0 +1,185 @@
> > +# SPDX-License-Identifier: GPL-2.0-or-later
> > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > +
> > +import sys
> > +import selectors
> > +import pykms
> > +
> > +FMT_MAP = {
> > + "RGB888": pykms.PixelFormat.RGB888,
> > + "YUYV": pykms.PixelFormat.YUYV,
> > + "ARGB8888": pykms.PixelFormat.ARGB8888,
> > + "XRGB8888": pykms.PixelFormat.XRGB8888,
> > +}
> > +
> > +class KMSRenderer:
> > + def __init__(self, state):
> > + self.state = state
> > +
> > + self.cm = state["cm"]
> > + self.contexts = state["contexts"]
> > + self.running = False
> > +
> > + card = pykms.Card()
> > +
> > + res = pykms.ResourceManager(card)
> > + conn = res.reserve_connector()
> > + crtc = res.reserve_crtc(conn)
> > + mode = conn.get_default_mode()
> > + modeb = mode.to_blob(card)
> > +
> > + req = pykms.AtomicReq(card)
> > + req.add_connector(conn, crtc)
> > + req.add_crtc(crtc, modeb)
> > + r = req.commit_sync(allow_modeset = True)
> > + assert(r == 0)
> > +
> > + self.card = card
> > + self.resman = res
> > + self.crtc = crtc
> > + self.mode = mode
> > +
> > + self.bufqueue = []
> > + self.current = None
> > + self.next = None
> > + self.cam_2_drm = {}
> > +
> > + # KMS
> > +
> > + def close(self):
> > + req = pykms.AtomicReq(self.card)
> > + for s in self.streams:
> > + req.add_plane(s["plane"], None, None, dst=(0, 0, 0, 0))
> > + req.commit()
> > +
> > + def add_plane(self, req, stream, fb):
> > + s = next(s for s in self.streams if s["stream"] == stream)
> > + idx = s["idx"]
> > + plane = s["plane"]
> > +
> > + if idx % 2 == 0:
> > + x = 0
> > + else:
> > + x = self.mode.hdisplay - fb.width
> > +
> > + if idx // 2 == 0:
> > + y = 0
> > + else:
> > + y = self.mode.vdisplay - fb.height
> > +
> > + req.add_plane(plane, fb, self.crtc, dst=(x, y, fb.width, fb.height))
> > +
> > + def apply_request(self, drmreq):
> > +
> > + buffers = drmreq["camreq"].buffers
> > +
> > + for stream, fb in buffers.items():
> > + drmfb = self.cam_2_drm.get(fb, None)
> > +
> > + req = pykms.AtomicReq(self.card)
> > + self.add_plane(req, stream, drmfb)
> > + req.commit()
> > +
> > + def handle_page_flip(self, frame, time):
> > + old = self.current
> > + self.current = self.next
> > +
> > + if len(self.bufqueue) > 0:
> > + self.next = self.bufqueue.pop(0)
> > + else:
> > + self.next = None
> > +
> > + if self.next:
> > + drmreq = self.next
> > +
> > + self.apply_request(drmreq)
> > +
> > + if old:
> > + req = old["camreq"]
> > + ctx = old["camctx"]
> > + self.state["request_prcessed"](ctx, req)
> > +
> > + def queue(self, drmreq):
> > + if not self.next:
> > + self.next = drmreq
> > + self.apply_request(drmreq)
> > + else:
> > + self.bufqueue.append(drmreq)
> > +
> > + # libcamera
> > +
> > + def setup(self):
> > + self.streams = []
> > +
> > + idx = 0
> > + for ctx in self.contexts:
> > + camera = ctx["camera"]
> > +
> > + for stream in ctx["streams"]:
> > +
> > + cfg = stream.configuration
> > + fmt = cfg.fmt
> > + fmt = FMT_MAP[fmt]
> > +
> > + plane = self.resman.reserve_generic_plane(self.crtc, fmt)
> > + assert(plane != None)
> > +
> > + self.streams.append({
> > + "idx": idx,
> > + "stream": stream,
> > + "plane": plane,
> > + "fmt": fmt,
> > + "size": cfg.size,
> > + })
> > +
> > + for fb in ctx["allocator"].buffers(stream):
> > + w, h = cfg.size
> > + stride = cfg.stride
> > + fd = fb.fd(0)
> > + drmfb = pykms.DmabufFramebuffer(self.card, w, h, fmt,
> > + [fd], [stride], [0])
> > + self.cam_2_drm[fb] = drmfb
> > +
> > + idx += 1
> > +
> > +
> > + def readdrm(self, fileobj):
> > + for ev in self.card.read_events():
> > + if ev.type == pykms.DrmEventType.FLIP_COMPLETE:
> > + self.handle_page_flip(ev.seq, ev.time)
> > +
> > + def readcam(self, fd):
> > + self.running = self.state["event_handler"](self.state)
> > +
> > + def readkey(self, fileobj):
> > + sys.stdin.readline()
> > + self.running = False
> > +
> > + def run(self):
> > + print("Capturing...")
> > +
> > + self.running = True
> > +
> > + sel = selectors.DefaultSelector()
> > + sel.register(self.card.fd, selectors.EVENT_READ, self.readdrm)
> > + sel.register(self.cm.efd, selectors.EVENT_READ, self.readcam)
> > + sel.register(sys.stdin, selectors.EVENT_READ, self.readkey)
> > +
> > + print("Press enter to exit")
> > +
> > + while self.running:
> > + events = sel.select()
> > + for key, mask in events:
> > + callback = key.data
> > + callback(key.fileobj)
> > +
> > + print("Exiting...")
> > +
> > + def request_handler(self, ctx, req):
> > +
> > + drmreq = {
> > + "camctx": ctx,
> > + "camreq": req,
> > + }
> > +
> > + self.queue(drmreq)
> > diff --git a/src/py/test/cam_null.py b/src/py/test/cam_null.py
> > new file mode 100644
> > index 00000000..3935f5b6
> > --- /dev/null
> > +++ b/src/py/test/cam_null.py
> > @@ -0,0 +1,46 @@
> > +# SPDX-License-Identifier: GPL-2.0-or-later
> > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > +
> > +import sys
> > +import selectors
> > +
> > +class NullRenderer:
> > + def __init__(self, state):
> > + self.state = state
> > +
> > + self.cm = state["cm"]
> > + self.contexts = state["contexts"]
> > +
> > + self.running = False
> > +
> > + def setup(self):
> > + pass
> > +
> > + def run(self):
> > + print("Capturing...")
> > +
> > + self.running = True
> > +
> > + sel = selectors.DefaultSelector()
> > + sel.register(self.cm.efd, selectors.EVENT_READ, self.readcam)
> > + sel.register(sys.stdin, selectors.EVENT_READ, self.readkey)
> > +
> > + print("Press enter to exit")
> > +
> > + while self.running:
> > + events = sel.select()
> > + for key, mask in events:
> > + callback = key.data
> > + callback(key.fileobj)
> > +
> > + print("Exiting...")
> > +
> > + def readcam(self, fd):
> > + self.running = self.state["event_handler"](self.state)
> > +
> > + def readkey(self, fileobj):
> > + sys.stdin.readline()
> > + self.running = False
> > +
> > + def request_handler(self, ctx, req):
> > + self.state["request_prcessed"](ctx, req)
> > diff --git a/src/py/test/cam_qt.py b/src/py/test/cam_qt.py
> > new file mode 100644
> > index 00000000..3ff12df6
> > --- /dev/null
> > +++ b/src/py/test/cam_qt.py
> > @@ -0,0 +1,355 @@
> > +# SPDX-License-Identifier: GPL-2.0-or-later
> > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > +#
> > +# Debayering code from PiCamera documentation
> > +
> > +from PyQt5 import QtCore, QtGui, QtWidgets
> > +from io import BytesIO
> > +from PIL import Image
> > +from PIL.ImageQt import ImageQt
> > +import numpy as np
> > +from numpy.lib.stride_tricks import as_strided
> > +import sys
> > +
> > +def rgb_to_pix(rgb):
> > + img = Image.frombuffer("RGB", (rgb.shape[1], rgb.shape[0]), rgb)
> > + qim = ImageQt(img).copy()
> > + pix = QtGui.QPixmap.fromImage(qim)
> > + return pix
> > +
> > +
> > +def separate_components(data, r0, g0, g1, b0):
> > + # Now to split the data up into its red, green, and blue components. The
> > + # Bayer pattern of the OV5647 sensor is BGGR. In other words the first
> > + # row contains alternating green/blue elements, the second row contains
> > + # alternating red/green elements, and so on as illustrated below:
> > + #
> > + # GBGBGBGBGBGBGB
> > + # RGRGRGRGRGRGRG
> > + # GBGBGBGBGBGBGB
> > + # RGRGRGRGRGRGRG
> > + #
> > + # Please note that if you use vflip or hflip to change the orientation
> > + # of the capture, you must flip the Bayer pattern accordingly
> > +
> > + rgb = np.zeros(data.shape + (3,), dtype=data.dtype)
> > + rgb[r0[1]::2, r0[0]::2, 0] = data[r0[1]::2, r0[0]::2] # Red
> > + rgb[g0[1]::2, g0[0]::2, 1] = data[g0[1]::2, g0[0]::2] # Green
> > + rgb[g1[1]::2, g1[0]::2, 1] = data[g1[1]::2, g1[0]::2] # Green
> > + rgb[b0[1]::2, b0[0]::2, 2] = data[b0[1]::2, b0[0]::2] # Blue
> > +
> > + return rgb
> > +
> > +def demosaic(rgb, r0, g0, g1, b0):
I'd split the image processing out of cam_qt.py.
> > + # At this point we now have the raw Bayer data with the correct values
> > + # and colors but the data still requires de-mosaicing and
> > + # post-processing. If you wish to do this yourself, end the script here!
> > + #
> > + # Below we present a fairly naive de-mosaic method that simply
> > + # calculates the weighted average of a pixel based on the pixels
> > + # surrounding it. The weighting is provided b0[1] a b0[1]te representation of
> > + # the Bayer filter which we construct first:
> > +
> > + bayer = np.zeros(rgb.shape, dtype=np.uint8)
> > + bayer[r0[1]::2, r0[0]::2, 0] = 1 # Red
> > + bayer[g0[1]::2, g0[0]::2, 1] = 1 # Green
> > + bayer[g1[1]::2, g1[0]::2, 1] = 1 # Green
> > + bayer[b0[1]::2, b0[0]::2, 2] = 1 # Blue
> > +
> > + # Allocate an array to hold our output with the same shape as the input
> > + # data. After this we define the size of window that will be used to
> > + # calculate each weighted average (3x3). Then we pad out the rgb and
> > + # bayer arrays, adding blank pixels at their edges to compensate for the
> > + # size of the window when calculating averages for edge pixels.
> > +
> > + output = np.empty(rgb.shape, dtype=rgb.dtype)
> > + window = (3, 3)
> > + borders = (window[0] - 1, window[1] - 1)
> > + border = (borders[0] // 2, borders[1] // 2)
> > +
> > + #rgb_pad = np.zeros((
> > + # rgb.shape[0] + borders[0],
> > + # rgb.shape[1] + borders[1],
> > + # rgb.shape[2]), dtype=rgb.dtype)
> > + #rgb_pad[
> > + # border[0]:rgb_pad.shape[0] - border[0],
> > + # border[1]:rgb_pad.shape[1] - border[1],
> > + # :] = rgb
> > + #rgb = rgb_pad
> > + #
> > + #bayer_pad = np.zeros((
> > + # bayer.shape[0] + borders[0],
> > + # bayer.shape[1] + borders[1],
> > + # bayer.shape[2]), dtype=bayer.dtype)
> > + #bayer_pad[
> > + # border[0]:bayer_pad.shape[0] - border[0],
> > + # border[1]:bayer_pad.shape[1] - border[1],
> > + # :] = bayer
> > + #bayer = bayer_pad
> > +
> > + # In numpy >=1.7.0 just use np.pad (version in Raspbian is 1.6.2 at the
> > + # time of writing...)
> > + #
> > + rgb = np.pad(rgb, [
> > + (border[0], border[0]),
> > + (border[1], border[1]),
> > + (0, 0),
> > + ], 'constant')
> > + bayer = np.pad(bayer, [
> > + (border[0], border[0]),
> > + (border[1], border[1]),
> > + (0, 0),
> > + ], 'constant')
> > +
> > + # For each plane in the RGB data, we use a nifty numpy trick
> > + # (as_strided) to construct a view over the plane of 3x3 matrices. We do
> > + # the same for the bayer array, then use Einstein summation on each
> > + # (np.sum is simpler, but copies the data so it's slower), and divide
> > + # the results to get our weighted average:
> > +
> > + for plane in range(3):
> > + p = rgb[..., plane]
> > + b = bayer[..., plane]
> > + pview = as_strided(p, shape=(
> > + p.shape[0] - borders[0],
> > + p.shape[1] - borders[1]) + window, strides=p.strides * 2)
> > + bview = as_strided(b, shape=(
> > + b.shape[0] - borders[0],
> > + b.shape[1] - borders[1]) + window, strides=b.strides * 2)
> > + psum = np.einsum('ijkl->ij', pview)
> > + bsum = np.einsum('ijkl->ij', bview)
> > + output[..., plane] = psum // bsum
> > +
> > + return output
> > +
> > +
> > +
> > +
> > +def to_rgb(fmt, size, data):
> > + w = size[0]
> > + h = size[1]
> > +
> > + if fmt == "YUYV":
> > + # YUV422
> > + yuyv = data.reshape((h, w // 2 * 4))
> > +
> > + # YUV444
> > + yuv = np.empty((h, w, 3), dtype=np.uint8)
> > + yuv[:, :, 0] = yuyv[:, 0::2] # Y
> > + yuv[:, :, 1] = yuyv[:, 1::4].repeat(2, axis=1) # U
> > + yuv[:, :, 2] = yuyv[:, 3::4].repeat(2, axis=1) # V
> > +
> > + m = np.array([
> > + [ 1.0, 1.0, 1.0],
> > + [-0.000007154783816076815, -0.3441331386566162, 1.7720025777816772],
> > + [ 1.4019975662231445, -0.7141380310058594 , 0.00001542569043522235]
> > + ])
> > +
> > + rgb = np.dot(yuv, m)
> > + rgb[:, :, 0] -= 179.45477266423404
> > + rgb[:, :, 1] += 135.45870971679688
> > + rgb[:, :, 2] -= 226.8183044444304
> > + rgb = rgb.astype(np.uint8)
> > +
> > + elif fmt == "RGB888":
> > + rgb = data.reshape((h, w, 3))
> > + rgb[:, :, [0, 1, 2]] = rgb[:, :, [2, 1, 0]]
> > +
> > + elif fmt == "BGR888":
> > + rgb = data.reshape((h, w, 3))
> > +
> > + elif fmt in ["ARGB8888", "XRGB8888"]:
> > + rgb = data.reshape((h, w, 4))
> > + rgb = np.flip(rgb, axis=2)
> > + # drop alpha component
> > + rgb = np.delete(rgb, np.s_[0::4], axis=2)
> > +
> > + elif fmt.startswith("S"):
> > + bayer_pattern = fmt[1:5]
> > + bitspp = int(fmt[5:])
> > +
> > + # TODO: shifting leaves the lowest bits 0
> > + if bitspp == 8:
> > + data = data.reshape((h, w))
> > + data = data.astype(np.uint16) << 8
> > + elif bitspp in [10, 12]:
> > + data = data.view(np.uint16)
> > + data = data.reshape((h, w))
> > + data = data << (16 - bitspp)
> > + else:
> > + raise Exception("Bad bitspp:" + str(bitspp))
> > +
> > + idx = bayer_pattern.find("R")
> > + assert(idx != -1)
> > + r0 = (idx % 2, idx // 2)
> > +
> > + idx = bayer_pattern.find("G")
> > + assert(idx != -1)
> > + g0 = (idx % 2, idx // 2)
> > +
> > + idx = bayer_pattern.find("G", idx + 1)
> > + assert(idx != -1)
> > + g1 = (idx % 2, idx // 2)
> > +
> > + idx = bayer_pattern.find("B")
> > + assert(idx != -1)
> > + b0 = (idx % 2, idx // 2)
> > +
> > + rgb = separate_components(data, r0, g0, g1, b0)
> > + rgb = demosaic(rgb, r0, g0, g1, b0)
> > + rgb = (rgb >> 8).astype(np.uint8)
> > +
> > + else:
> > + rgb = None
> > +
> > + return rgb
> > +
> > +
> > +class QtRenderer:
> > + def __init__(self, state):
> > + self.state = state
> > +
> > + self.cm = state["cm"]
> > + self.contexts = state["contexts"]
> > +
> > + def setup(self):
> > + self.app = QtWidgets.QApplication([])
> > +
> > + windows = []
> > +
> > + for ctx in self.contexts:
> > + camera = ctx["camera"]
> > +
> > + for stream in ctx["streams"]:
> > + fmt = stream.configuration.fmt
> > + size = stream.configuration.size
> > +
> > + window = MainWindow(ctx, stream)
> > + window.setAttribute(QtCore.Qt.WA_ShowWithoutActivating)
> > + window.show()
> > + windows.append(window)
> > +
> > + self.windows = windows
> > +
> > + def run(self):
> > + camnotif = QtCore.QSocketNotifier(self.cm.efd, QtCore.QSocketNotifier.Read)
> > + camnotif.activated.connect(lambda x: self.readcam())
> > +
> > + keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Read)
> > + keynotif.activated.connect(lambda x: self.readkey())
> > +
> > + print("Capturing...")
> > +
> > + self.app.exec()
> > +
> > + print("Exiting...")
> > +
> > + def readcam(self):
> > + running = self.state["event_handler"](self.state)
> > +
> > + if not running:
> > + self.app.quit()
> > +
> > + def readkey(self):
> > + sys.stdin.readline()
> > + self.app.quit()
> > +
> > + def request_handler(self, ctx, req):
> > + buffers = req.buffers
> > +
> > + for stream, fb in buffers.items():
> > + wnd = next(wnd for wnd in self.windows if wnd.stream == stream)
> > +
> > + wnd.handle_request(stream, fb)
> > +
> > + self.state["request_prcessed"](ctx, req)
> > +
> > + def cleanup(self):
> > + for w in self.windows:
> > + w.close()
> > +
> > +
> > +class MainWindow(QtWidgets.QWidget):
> > + def __init__(self, ctx, stream):
> > + super().__init__()
> > +
> > + self.ctx = ctx
> > + self.stream = stream
> > +
> > + self.label = QtWidgets.QLabel()
> > +
> > + windowLayout = QtWidgets.QHBoxLayout()
> > + self.setLayout(windowLayout)
> > +
> > + windowLayout.addWidget(self.label)
> > +
> > + controlsLayout = QtWidgets.QVBoxLayout()
> > + windowLayout.addLayout(controlsLayout)
> > +
> > + windowLayout.addStretch()
> > +
> > + group = QtWidgets.QGroupBox("Info")
> > + groupLayout = QtWidgets.QVBoxLayout()
> > + group.setLayout(groupLayout)
> > + controlsLayout.addWidget(group)
> > +
> > + lab = QtWidgets.QLabel(ctx["id"])
> > + groupLayout.addWidget(lab)
> > +
> > + self.frameLabel = QtWidgets.QLabel()
> > + groupLayout.addWidget(self.frameLabel)
> > +
> > +
> > + group = QtWidgets.QGroupBox("Properties")
> > + groupLayout = QtWidgets.QVBoxLayout()
> > + group.setLayout(groupLayout)
> > + controlsLayout.addWidget(group)
> > +
> > + camera = ctx["camera"]
> > +
> > + for k, v in camera.properties.items():
> > + lab = QtWidgets.QLabel()
> > + lab.setText(k + " = " + str(v))
> > + groupLayout.addWidget(lab)
> > +
> > + group = QtWidgets.QGroupBox("Controls")
> > + groupLayout = QtWidgets.QVBoxLayout()
> > + group.setLayout(groupLayout)
> > + controlsLayout.addWidget(group)
> > +
> > + for k, (min, max, default) in camera.controls.items():
> > + lab = QtWidgets.QLabel()
> > + lab.setText("{} = {}/{}/{}".format(k, min, max, default))
> > + groupLayout.addWidget(lab)
> > +
> > + controlsLayout.addStretch()
> > +
> > + def buf_to_qpixmap(self, stream, fb):
> > + with fb.mmap(0) as b:
> > + cfg = stream.configuration
> > + w, h = cfg.size
> > + pitch = cfg.stride
> > +
> > + if cfg.fmt == "MJPEG":
> > + img = Image.open(BytesIO(b))
> > + qim = ImageQt(img).copy()
> > + pix = QtGui.QPixmap.fromImage(qim)
> > + else:
> > + data = np.array(b, dtype=np.uint8)
> > + rgb = to_rgb(cfg.fmt, cfg.size, data)
> > +
> > + if rgb is None:
> > + raise Exception("Format not supported: " + cfg.fmt)
> > +
> > + pix = rgb_to_pix(rgb)
> > +
> > + return pix
> > +
> > + def handle_request(self, stream, fb):
> > + ctx = self.ctx
> > +
> > + pix = self.buf_to_qpixmap(stream, fb)
> > + self.label.setPixmap(pix)
> > +
> > + self.frameLabel.setText("Queued: {}\nDone: {}\nFps: {:.2f}"
> > + .format(ctx["reqs-queued"], ctx["reqs-completed"], ctx["fps"]))
> > diff --git a/src/py/test/cam_qtgl.py b/src/py/test/cam_qtgl.py
> > new file mode 100644
> > index 00000000..01168979
> > --- /dev/null
> > +++ b/src/py/test/cam_qtgl.py
> > @@ -0,0 +1,385 @@
> > +# SPDX-License-Identifier: GPL-2.0-or-later
> > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > +
> > +from PyQt5 import QtCore, QtWidgets
> > +from PyQt5.QtCore import Qt
> > +
> > +import numpy as np
> > +import sys
> > +import os
> > +os.environ["PYOPENGL_PLATFORM"] = "egl"
> > +import math
> > +
> > +import OpenGL
> > +#OpenGL.FULL_LOGGING = True
> > +
> > +from OpenGL import GL as gl
> > +from OpenGL.EGL.KHR.image import *
> > +from OpenGL.EGL.EXT.image_dma_buf_import import *
> > +from OpenGL.EGL.VERSION.EGL_1_0 import *
> > +from OpenGL.EGL.VERSION.EGL_1_2 import *
> > +from OpenGL.EGL.VERSION.EGL_1_3 import *
> > +
> > +from OpenGL.GLES2.VERSION.GLES2_2_0 import *
> > +from OpenGL.GLES2.OES.EGL_image import *
> > +from OpenGL.GLES2.OES.EGL_image_external import *
> > +from OpenGL.GLES3.VERSION.GLES3_3_0 import *
> > +
> > +from OpenGL.GL import shaders
> > +
> > +from gl_helpers import *
> > +
> > +# libcamera format string -> DRM fourcc
> > +FMT_MAP = {
> > + "RGB888": "RG24",
> > + "XRGB8888": "XR24",
> > + "ARGB8888": "AR24",
> > + "YUYV": "YUYV",
> > +}
> > +
> > +class EglState:
> > + def __init__(self):
> > + self.create_display()
> > + self.choose_config()
> > + self.create_context()
> > + self.check_extensions()
> > +
> > + def create_display(self):
> > + xdpy = getEGLNativeDisplay()
> > + dpy = eglGetDisplay(xdpy)
> > + self.display = dpy
> > +
> > + def choose_config(self):
> > + dpy = self.display
> > +
> > + major, minor = EGLint(), EGLint()
> > +
> > + b = eglInitialize(dpy, major, minor)
> > + assert(b)
> > +
> > + print("EGL {} {}".format(
> > + eglQueryString(dpy, EGL_VENDOR).decode(),
> > + eglQueryString(dpy, EGL_VERSION).decode()))
> > +
> > + check_egl_extensions(dpy, ["EGL_EXT_image_dma_buf_import"])
> > +
> > + b = eglBindAPI(EGL_OPENGL_ES_API)
> > + assert(b)
> > +
> > + def print_config(dpy, cfg):
> > +
> > + def _getconf(dpy, cfg, a):
> > + value = ctypes.c_long()
> > + eglGetConfigAttrib(dpy, cfg, a, value)
> > + return value.value
> > +
> > + getconf = lambda a: _getconf(dpy, cfg, a)
> > +
> > + print("EGL Config {}: color buf {}/{}/{}/{} = {}, depth {}, stencil {}, native visualid {}, native visualtype {}".format(
> > + getconf(EGL_CONFIG_ID),
> > + getconf(EGL_ALPHA_SIZE),
> > + getconf(EGL_RED_SIZE),
> > + getconf(EGL_GREEN_SIZE),
> > + getconf(EGL_BLUE_SIZE),
> > + getconf(EGL_BUFFER_SIZE),
> > + getconf(EGL_DEPTH_SIZE),
> > + getconf(EGL_STENCIL_SIZE),
> > + getconf(EGL_NATIVE_VISUAL_ID),
> > + getconf(EGL_NATIVE_VISUAL_TYPE)))
> > +
> > + if False:
> > + num_configs = ctypes.c_long()
> > + eglGetConfigs(dpy, None, 0, num_configs)
> > + print("{} configs".format(num_configs.value))
> > +
> > + configs = (EGLConfig * num_configs.value)()
> > + eglGetConfigs(dpy, configs, num_configs.value, num_configs)
> > + for config_id in configs:
> > + print_config(dpy, config_id)
> > +
> > +
> > + config_attribs = [
> > + EGL_SURFACE_TYPE, EGL_WINDOW_BIT,
> > + EGL_RED_SIZE, 8,
> > + EGL_GREEN_SIZE, 8,
> > + EGL_BLUE_SIZE, 8,
> > + EGL_ALPHA_SIZE, 0,
> > + EGL_RENDERABLE_TYPE, EGL_OPENGL_ES2_BIT,
> > + EGL_NONE,
> > + ]
> > +
> > + n = EGLint()
> > + configs = (EGLConfig * 1)()
> > + b = eglChooseConfig(dpy, config_attribs, configs, 1, n)
> > + assert(b and n.value == 1)
> > + config = configs[0]
> > +
> > + print("Chosen Config:")
> > + print_config(dpy, config)
> > +
> > + self.config = config
> > +
> > + def create_context(self):
> > + dpy = self.display
> > +
> > + context_attribs = [
> > + EGL_CONTEXT_CLIENT_VERSION, 2,
> > + EGL_NONE,
> > + ]
> > +
> > + context = eglCreateContext(dpy, self.config, EGL_NO_CONTEXT, context_attribs)
> > + assert(context)
> > +
> > + b = eglMakeCurrent(dpy, EGL_NO_SURFACE, EGL_NO_SURFACE, context)
> > + assert(b)
> > +
> > + self.context = context
> > +
> > + def check_extensions(self):
> > + check_gl_extensions(["GL_OES_EGL_image"])
> > +
> > + assert(eglCreateImageKHR)
> > + assert(eglDestroyImageKHR)
> > + assert(glEGLImageTargetTexture2DOES)
> > +
> > +
> > +class QtRenderer:
> > + def __init__(self, state):
> > + self.state = state
> > +
> > + def setup(self):
> > + self.app = QtWidgets.QApplication([])
> > +
> > + window = MainWindow(self.state)
> > + window.setAttribute(QtCore.Qt.WA_ShowWithoutActivating)
> > + window.show()
> > +
> > + self.window = window
> > +
> > + def run(self):
> > + camnotif = QtCore.QSocketNotifier(self.state["cm"].efd, QtCore.QSocketNotifier.Read)
> > + camnotif.activated.connect(lambda x: self.readcam())
> > +
> > + keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Read)
> > + keynotif.activated.connect(lambda x: self.readkey())
> > +
> > + print("Capturing...")
> > +
> > + self.app.exec()
> > +
> > + print("Exiting...")
> > +
> > + def readcam(self):
> > + running = self.state["event_handler"](self.state)
> > +
> > + if not running:
> > + self.app.quit()
> > +
> > + def readkey(self):
> > + sys.stdin.readline()
> > + self.app.quit()
> > +
> > + def request_handler(self, ctx, req):
> > + self.window.handle_request(ctx, req)
> > +
> > + def cleanup(self):
> > + self.window.close()
> > +
> > +
> > +class MainWindow(QtWidgets.QWidget):
> > + def __init__(self, state):
> > + super().__init__()
> > +
> > + self.setAttribute(Qt.WA_PaintOnScreen)
> > + self.setAttribute(Qt.WA_NativeWindow)
> > +
> > + self.state = state
> > +
> > + self.textures = {}
> > + self.reqqueue = {}
> > + self.current = {}
> > +
> > + for ctx in self.state["contexts"]:
> > +
> > + self.reqqueue[ctx["idx"]] = []
> > + self.current[ctx["idx"]] = []
> > +
> > + for stream in ctx["streams"]:
> > + fmt = stream.configuration.fmt
> > + size = stream.configuration.size
> > +
> > + if not fmt in FMT_MAP:
> > + raise Exception("Unsupported pixel format: " + str(fmt))
> > +
> > + self.textures[stream] = None
> > +
> > + num_tiles = len(self.textures)
> > + self.num_columns = math.ceil(math.sqrt(num_tiles))
> > + self.num_rows = math.ceil(num_tiles / self.num_columns)
> > +
> > + self.egl = EglState()
> > +
> > + self.surface = None
> > +
> > + def paintEngine(self):
> > + return None
> > +
> > + def create_surface(self):
> > + native_surface = c_void_p(self.winId().__int__())
> > + surface = eglCreateWindowSurface(self.egl.display, self.egl.config,
> > + native_surface, None)
> > +
> > + b = eglMakeCurrent(self.egl.display, self.surface, self.surface, self.egl.context)
> > + assert(b)
> > +
> > + self.surface = surface
> > +
> > + def init_gl(self):
> > + self.create_surface()
> > +
> > + vertShaderSrc = """
> > + attribute vec2 aPosition;
> > + varying vec2 texcoord;
> > +
> > + void main()
> > + {
> > + gl_Position = vec4(aPosition * 2.0 - 1.0, 0.0, 1.0);
> > + texcoord.x = aPosition.x;
> > + texcoord.y = 1.0 - aPosition.y;
> > + }
> > + """
> > + fragShaderSrc = """
> > + #extension GL_OES_EGL_image_external : enable
> > + precision mediump float;
> > + varying vec2 texcoord;
> > + uniform samplerExternalOES texture;
> > +
> > + void main()
> > + {
> > + gl_FragColor = texture2D(texture, texcoord);
> > + }
> > + """
> > +
> > + program = shaders.compileProgram(
> > + shaders.compileShader(vertShaderSrc, GL_VERTEX_SHADER),
> > + shaders.compileShader(fragShaderSrc, GL_FRAGMENT_SHADER)
> > + )
> > +
> > + glUseProgram(program)
> > +
> > + glClearColor(0.5, 0.8, 0.7, 1.0)
> > +
> > + vertPositions = [
> > + 0.0, 0.0,
> > + 1.0, 0.0,
> > + 1.0, 1.0,
> > + 0.0, 1.0
> > + ]
> > +
> > + inputAttrib = glGetAttribLocation(program, "aPosition")
> > + glVertexAttribPointer(inputAttrib, 2, GL_FLOAT, GL_FALSE, 0, vertPositions)
> > + glEnableVertexAttribArray(inputAttrib)
> > +
> > +
> > + def create_texture(self, stream, fb):
> > + cfg = stream.configuration
> > + fmt = cfg.fmt
> > + fmt = str_to_fourcc(FMT_MAP[fmt])
> > + w, h = cfg.size
> > +
> > + attribs = [
> > + EGL_WIDTH, w,
> > + EGL_HEIGHT, h,
> > + EGL_LINUX_DRM_FOURCC_EXT, fmt,
> > + EGL_DMA_BUF_PLANE0_FD_EXT, fb.fd(0),
> > + EGL_DMA_BUF_PLANE0_OFFSET_EXT, 0,
> > + EGL_DMA_BUF_PLANE0_PITCH_EXT, cfg.stride,
> > + EGL_NONE,
> > + ]
> > +
> > + image = eglCreateImageKHR(self.egl.display,
> > + EGL_NO_CONTEXT,
> > + EGL_LINUX_DMA_BUF_EXT,
> > + None,
> > + attribs)
Will be interesting to add dmabuf support to qcam at some point.
> > + assert(image)
> > +
> > + textures = glGenTextures(1)
> > + glBindTexture(GL_TEXTURE_EXTERNAL_OES, textures)
> > + glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
> > + glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_MIN_FILTER, GL_LINEAR)
> > + glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE)
> > + glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE)
> > + glEGLImageTargetTexture2DOES(GL_TEXTURE_EXTERNAL_OES, image)
> > +
> > + return textures
> > +
> > + def resizeEvent(self, event):
> > + size = event.size()
> > +
> > + print("Resize", size)
> > +
> > + super().resizeEvent(event)
> > +
> > + if self.surface == None:
> > + return
> > +
> > + glViewport(0, 0, size.width()//2, size.height())
> > +
> > + def paintEvent(self, event):
> > + if self.surface == None:
> > + self.init_gl()
> > +
> > + for ctx_idx, queue in self.reqqueue.items():
> > + if len(queue) == 0:
> > + continue
> > +
> > + ctx = next(ctx for ctx in self.state["contexts"] if ctx["idx"] == ctx_idx)
> > +
> > + if self.current[ctx_idx]:
> > + old = self.current[ctx_idx]
> > + self.current[ctx_idx] = None
> > + self.state["request_prcessed"](ctx, old)
> > +
> > + next_req = queue.pop(0)
> > + self.current[ctx_idx] = next_req
> > +
> > + stream, fb = next(iter(next_req.buffers.items()))
> > +
> > + self.textures[stream] = self.create_texture(stream, fb)
> > +
> > + self.paint_gl()
> > +
> > + def paint_gl(self):
> > + b = eglMakeCurrent(self.egl.display, self.surface, self.surface, self.egl.context)
> > + assert(b)
> > +
> > + glClear(GL_COLOR_BUFFER_BIT)
> > +
> > + size = self.size()
> > +
> > + for idx,ctx in enumerate(self.state["contexts"]):
> > + for stream in ctx["streams"]:
> > + if self.textures[stream] == None:
> > + continue
> > +
> > + w = size.width() // self.num_columns
> > + h = size.height() // self.num_rows
> > +
> > + x = idx % self.num_columns
> > + y = idx // self.num_columns
> > +
> > + x *= w
> > + y *= h
> > +
> > + glViewport(x, y, w, h)
> > +
> > + glBindTexture(GL_TEXTURE_EXTERNAL_OES, self.textures[stream])
> > + glDrawArrays(GL_TRIANGLE_FAN, 0, 4)
> > +
> > + b = eglSwapBuffers(self.egl.display, self.surface)
> > + assert(b)
> > +
> > + def handle_request(self, ctx, req):
> > + self.reqqueue[ctx["idx"]].append(req)
> > + self.update()
> > diff --git a/src/py/test/gl_helpers.py b/src/py/test/gl_helpers.py
> > new file mode 100644
> > index 00000000..a80b03b2
> > --- /dev/null
> > +++ b/src/py/test/gl_helpers.py
> > @@ -0,0 +1,67 @@
> > +# SPDX-License-Identifier: GPL-2.0-or-later
> > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> > +
> > +from OpenGL.EGL.VERSION.EGL_1_0 import EGLNativeDisplayType, eglGetProcAddress, eglQueryString, EGL_EXTENSIONS
> > +
> > +from OpenGL.raw.GLES2 import _types as _cs
> > +from OpenGL.GLES2.VERSION.GLES2_2_0 import *
> > +from OpenGL.GLES3.VERSION.GLES3_3_0 import *
> > +from OpenGL import GL as gl
> > +
> > +from ctypes import c_int, c_char_p, c_void_p, cdll, POINTER, util, \
> > + pointer, CFUNCTYPE, c_bool
> > +
> > +def getEGLNativeDisplay():
> > + _x11lib = cdll.LoadLibrary(util.find_library("X11"))
> > + XOpenDisplay = _x11lib.XOpenDisplay
> > + XOpenDisplay.argtypes = [c_char_p]
> > + XOpenDisplay.restype = POINTER(EGLNativeDisplayType)
> > +
> > + xdpy = XOpenDisplay(None)
> > +
> > +# Hack. PyOpenGL doesn't seem to manage to find glEGLImageTargetTexture2DOES.
> > +def getglEGLImageTargetTexture2DOES():
> > + funcptr = eglGetProcAddress("glEGLImageTargetTexture2DOES")
> > + prototype = CFUNCTYPE(None,_cs.GLenum,_cs.GLeglImageOES)
> > + return prototype(funcptr)
> > +
> > +glEGLImageTargetTexture2DOES = getglEGLImageTargetTexture2DOES()
> > +
> > +
> > +def str_to_fourcc(str):
> > + assert(len(str) == 4)
> > + fourcc = 0
> > + for i,v in enumerate([ord(c) for c in str]):
> > + fourcc |= v << (i * 8)
> > + return fourcc
> > +
> > +def get_gl_extensions():
> > + n = GLint()
> > + glGetIntegerv(GL_NUM_EXTENSIONS, n)
> > + gl_extensions = []
> > + for i in range(n.value):
> > + gl_extensions.append(gl.glGetStringi(GL_EXTENSIONS, i).decode())
> > + return gl_extensions
> > +
> > +def check_gl_extensions(required_extensions):
> > + extensions = get_gl_extensions()
> > +
> > + if False:
> > + print("GL EXTENSIONS: ", " ".join(extensions))
> > +
> > + for ext in required_extensions:
> > + if not ext in extensions:
> > + raise Exception(ext + " missing")
> > +
> > +def get_egl_extensions(egl_display):
> > + return eglQueryString(egl_display, EGL_EXTENSIONS).decode().split(" ")
> > +
> > +def check_egl_extensions(egl_display, required_extensions):
> > + extensions = get_egl_extensions(egl_display)
> > +
> > + if False:
> > + print("EGL EXTENSIONS: ", " ".join(extensions))
> > +
> > + for ext in required_extensions:
> > + if not ext in extensions:
> > + raise Exception(ext + " missing")
--
Regards,
Laurent Pinchart
More information about the libcamera-devel
mailing list