[libcamera-devel] [PATCH v7 06/13] py: add unittests.py
Laurent Pinchart
laurent.pinchart at ideasonboard.com
Thu May 5 19:52:59 CEST 2022
Hi Tomi,
Thank you for the patch.
On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote:
> Add a simple unittests.py as a base for python unittests.
>
> Signed-off-by: Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> ---
> test/meson.build | 1 +
> test/py/meson.build | 17 ++
> test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 386 insertions(+)
> create mode 100644 test/py/meson.build
> create mode 100755 test/py/unittests.py
>
> diff --git a/test/meson.build b/test/meson.build
> index fd4c5ca0..623f3baa 100644
> --- a/test/meson.build
> +++ b/test/meson.build
> @@ -18,6 +18,7 @@ subdir('log')
> subdir('media_device')
> subdir('pipeline')
> subdir('process')
> +subdir('py')
> subdir('serialization')
> subdir('stream')
> subdir('v4l2_compat')
> diff --git a/test/py/meson.build b/test/py/meson.build
> new file mode 100644
> index 00000000..f6b42bd0
> --- /dev/null
> +++ b/test/py/meson.build
> @@ -0,0 +1,17 @@
> +# SPDX-License-Identifier: CC0-1.0
> +
> +if not pycamera_enabled
> + subdir_done()
> +endif
> +
> +pymod = import('python')
> +py3 = pymod.find_installation('python3')
> +
> +pypathdir = meson.project_build_root() / 'src/py'
pypathdir = meson.project_build_root() / 'src' / 'py'
> +
> +test('pyunittests',
> + py3,
> + args : files('unittests.py'),
> + env : ['PYTHONPATH=' + pypathdir],
> + suite : 'pybindings',
> + is_parallel : false)
> diff --git a/test/py/unittests.py b/test/py/unittests.py
> new file mode 100755
> index 00000000..15d5b4a7
> --- /dev/null
> +++ b/test/py/unittests.py
> @@ -0,0 +1,368 @@
> +#!/usr/bin/env python3
> +
> +# SPDX-License-Identifier: GPL-2.0-or-later
> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
> +
> +from collections import defaultdict
> +import errno
> +import gc
> +import libcamera as libcam
I'm tempted to claim the "cam" name in all our Python code, or possibly
"camera". What do you think ?
> +import os
> +import selectors
> +import time
> +import unittest
> +import weakref
> +
> +
> +class MyTestCase(unittest.TestCase):
s/MyTestCase/BaseTestCase/
It's not yours only anymore :-)
> + def assertZero(self, a, msg=None):
> + self.assertEqual(a, 0, msg)
> +
> +
> +class SimpleTestMethods(MyTestCase):
> + def test_find_ref(self):
> + cm = libcam.CameraManager.singleton()
> + wr_cm = weakref.ref(cm)
> +
> + cam = cm.find("platform/vimc")
Standardizing on single-quotes here too would be nice.
> + self.assertIsNotNone(cam)
> + wr_cam = weakref.ref(cam)
> +
> + cm = None
> + gc.collect()
> + self.assertIsNotNone(wr_cm())
> +
> + cam = None
> + gc.collect()
> + self.assertIsNone(wr_cm())
> + self.assertIsNone(wr_cam())
> +
> + def test_get_ref(self):
> + cm = libcam.CameraManager.singleton()
> + wr_cm = weakref.ref(cm)
> +
> + cam = cm.get("platform/vimc.0 Sensor B")
> + self.assertTrue(cam is not None)
> + wr_cam = weakref.ref(cam)
> +
> + cm = None
> + gc.collect()
> + self.assertIsNotNone(wr_cm())
> +
> + cam = None
> + gc.collect()
> + self.assertIsNone(wr_cm())
> + self.assertIsNone(wr_cam())
> +
> + def test_acquire_release(self):
> + cm = libcam.CameraManager.singleton()
> + cam = cm.get("platform/vimc.0 Sensor B")
> + self.assertTrue(cam is not None)
> +
> + ret = cam.acquire()
> + self.assertZero(ret)
> +
> + ret = cam.release()
> + self.assertZero(ret)
> +
> + def test_double_acquire(self):
> + cm = libcam.CameraManager.singleton()
> + cam = cm.get("platform/vimc.0 Sensor B")
> + self.assertTrue(cam is not None)
> +
> + ret = cam.acquire()
> + self.assertZero(ret)
> +
> + libcam.logSetLevel("Camera", "FATAL")
> + ret = cam.acquire()
> + self.assertEqual(ret, -errno.EBUSY)
> + libcam.logSetLevel("Camera", "ERROR")
> +
> + ret = cam.release()
> + self.assertZero(ret)
> +
> + ret = cam.release()
> + # I expected EBUSY, but looks like double release works fine
> + self.assertZero(ret)
> +
> +
> +class CameraTesterBase(MyTestCase):
> + def setUp(self):
> + self.cm = libcam.CameraManager.singleton()
> + self.cam = self.cm.find("platform/vimc")
> + if self.cam is None:
> + self.cm = None
> + raise Exception("No vimc found")
Is it possible to skip the test in that case instead of failing ? This
could be done on top, a todo comment somewhere in the file would be good
then.
> +
> + ret = self.cam.acquire()
> + if ret != 0:
> + self.cam = None
> + self.cm = None
> + raise Exception("Failed to acquire camera")
> +
> + def tearDown(self):
> + # If a test fails, the camera may be in running state. So always stop.
> + self.cam.stop()
> +
> + ret = self.cam.release()
> + if ret != 0:
> + raise Exception("Failed to release camera")
> +
> + self.cam = None
> + self.cm = None
> +
> +
> +class AllocatorTestMethods(CameraTesterBase):
> + def test_allocator(self):
> + cam = self.cam
> +
> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> + self.assertTrue(camconfig.size == 1)
> + wr_camconfig = weakref.ref(camconfig)
> +
> + streamconfig = camconfig.at(0)
> + wr_streamconfig = weakref.ref(streamconfig)
> +
> + ret = cam.configure(camconfig)
> + self.assertZero(ret)
> +
> + stream = streamconfig.stream
> + wr_stream = weakref.ref(stream)
> +
> + # stream should keep streamconfig and camconfig alive
> + streamconfig = None
> + camconfig = None
> + gc.collect()
> + self.assertIsNotNone(wr_camconfig())
> + self.assertIsNotNone(wr_streamconfig())
> +
> + allocator = libcam.FrameBufferAllocator(cam)
> + ret = allocator.allocate(stream)
> + self.assertTrue(ret > 0)
> + wr_allocator = weakref.ref(allocator)
> +
> + buffers = allocator.buffers(stream)
> + buffers = None
> +
> + buffer = allocator.buffers(stream)[0]
> + self.assertIsNotNone(buffer)
> + wr_buffer = weakref.ref(buffer)
> +
> + allocator = None
> + gc.collect()
> + self.assertIsNotNone(wr_buffer())
> + self.assertIsNotNone(wr_allocator())
> + self.assertIsNotNone(wr_stream())
> +
> + buffer = None
> + gc.collect()
> + self.assertIsNone(wr_buffer())
> + self.assertIsNone(wr_allocator())
> + self.assertIsNotNone(wr_stream())
> +
> + stream = None
> + gc.collect()
> + self.assertIsNone(wr_stream())
> + self.assertIsNone(wr_camconfig())
> + self.assertIsNone(wr_streamconfig())
> +
> +
> +class SimpleCaptureMethods(CameraTesterBase):
> + def test_sleep(self):
> + cm = self.cm
> + cam = self.cam
> +
> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> + self.assertTrue(camconfig.size == 1)
> +
> + streamconfig = camconfig.at(0)
> + fmts = streamconfig.formats
> +
> + ret = cam.configure(camconfig)
> + self.assertZero(ret)
> +
> + stream = streamconfig.stream
> +
> + allocator = libcam.FrameBufferAllocator(cam)
> + ret = allocator.allocate(stream)
> + self.assertTrue(ret > 0)
> +
> + num_bufs = len(allocator.buffers(stream))
> +
> + reqs = []
> + for i in range(num_bufs):
> + req = cam.createRequest(i)
> + self.assertIsNotNone(req)
> +
> + buffer = allocator.buffers(stream)[i]
> + ret = req.addBuffer(stream, buffer)
> + self.assertZero(ret)
> +
> + reqs.append(req)
> +
> + buffer = None
> +
> + ret = cam.start()
> + self.assertZero(ret)
> +
> + for req in reqs:
> + ret = cam.queueRequest(req)
> + self.assertZero(ret)
> +
> + reqs = None
> + gc.collect()
> +
> + time.sleep(0.5)
> +
> + reqs = cm.getReadyRequests()
> +
> + self.assertTrue(len(reqs) == num_bufs)
> +
> + for i, req in enumerate(reqs):
> + self.assertTrue(i == req.cookie)
> +
> + reqs = None
> + gc.collect()
> +
> + ret = cam.stop()
> + self.assertZero(ret)
> +
> + def test_select(self):
> + cm = self.cm
> + cam = self.cam
> +
> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> + self.assertTrue(camconfig.size == 1)
> +
> + streamconfig = camconfig.at(0)
> + fmts = streamconfig.formats
> +
> + ret = cam.configure(camconfig)
> + self.assertZero(ret)
> +
> + stream = streamconfig.stream
> +
> + allocator = libcam.FrameBufferAllocator(cam)
> + ret = allocator.allocate(stream)
> + self.assertTrue(ret > 0)
> +
> + num_bufs = len(allocator.buffers(stream))
> +
> + reqs = []
> + for i in range(num_bufs):
> + req = cam.createRequest(i)
> + self.assertIsNotNone(req)
> +
> + buffer = allocator.buffers(stream)[i]
> + ret = req.addBuffer(stream, buffer)
> + self.assertZero(ret)
> +
> + reqs.append(req)
> +
> + buffer = None
> +
> + ret = cam.start()
> + self.assertZero(ret)
> +
> + for req in reqs:
> + ret = cam.queueRequest(req)
> + self.assertZero(ret)
> +
> + reqs = None
> + gc.collect()
> +
> + sel = selectors.DefaultSelector()
> + sel.register(cm.efd, selectors.EVENT_READ, 123)
Is the data argument needed ?
> +
> + reqs = []
> +
> + running = True
> + while running:
> + events = sel.select()
> + for key, mask in events:
> + os.read(key.fileobj, 8)
> +
> + ready_reqs = cm.getReadyRequests()
> +
> + self.assertTrue(len(ready_reqs) > 0)
> +
> + reqs += ready_reqs
> +
> + if len(reqs) == num_bufs:
> + running = False
> +
> + self.assertTrue(len(reqs) == num_bufs)
> +
> + for i, req in enumerate(reqs):
> + self.assertTrue(i == req.cookie)
> +
> + reqs = None
> + gc.collect()
> +
> + ret = cam.stop()
> + self.assertZero(ret)
> +
> +
> +# Recursively expand slist's objects into olist, using seen to track already
> +# processed objects.
> +def _getr(slist, olist, seen):
> + for e in slist:
> + if id(e) in seen:
> + continue
> + seen.add(id(e))
> + olist.append(e)
> + tl = gc.get_referents(e)
> + if tl:
> + _getr(tl, olist, seen)
> +
> +
> +def get_all_objects(ignored=[]):
> + gcl = gc.get_objects()
> + olist = []
> + seen = set()
> +
> + seen.add(id(gcl))
> + seen.add(id(olist))
> + seen.add(id(seen))
> + seen.update(set([id(o) for o in ignored]))
> +
> + _getr(gcl, olist, seen)
> +
> + return olist
> +
> +
> +def create_type_count_map(olist):
> + map = defaultdict(int)
> + for o in olist:
> + map[type(o)] += 1
> + return map
> +
> +
> +def diff_type_count_maps(before, after):
> + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
> +
> +
> +if __name__ == '__main__':
> + # doesn't work very well, as things always leak a bit
Lovely :-) Is it something we can fix ?
s/doesn't/Doesn't/
Reviewed-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> + test_leaks = False
> +
> + if test_leaks:
> + gc.unfreeze()
> + gc.collect()
> +
> + obs_before = get_all_objects()
> +
> + unittest.main(exit=False)
> +
> + if test_leaks:
> + gc.unfreeze()
> + gc.collect()
> +
> + obs_after = get_all_objects([obs_before])
> +
> + before = create_type_count_map(obs_before)
> + after = create_type_count_map(obs_after)
> +
> + leaks = diff_type_count_maps(before, after)
> + if len(leaks) > 0:
> + print(leaks)
--
Regards,
Laurent Pinchart
More information about the libcamera-devel
mailing list