[libcamera-devel] [PATCH v7 06/13] py: add unittests.py
Tomi Valkeinen
tomi.valkeinen at ideasonboard.com
Fri May 6 12:59:38 CEST 2022
On 05/05/2022 20:52, Laurent Pinchart wrote:
> Hi Tomi,
>
> Thank you for the patch.
>
> On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote:
>> Add a simple unittests.py as a base for python unittests.
>>
>> Signed-off-by: Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
>> ---
>> test/meson.build | 1 +
>> test/py/meson.build | 17 ++
>> test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++
>> 3 files changed, 386 insertions(+)
>> create mode 100644 test/py/meson.build
>> create mode 100755 test/py/unittests.py
>>
>> diff --git a/test/meson.build b/test/meson.build
>> index fd4c5ca0..623f3baa 100644
>> --- a/test/meson.build
>> +++ b/test/meson.build
>> @@ -18,6 +18,7 @@ subdir('log')
>> subdir('media_device')
>> subdir('pipeline')
>> subdir('process')
>> +subdir('py')
>> subdir('serialization')
>> subdir('stream')
>> subdir('v4l2_compat')
>> diff --git a/test/py/meson.build b/test/py/meson.build
>> new file mode 100644
>> index 00000000..f6b42bd0
>> --- /dev/null
>> +++ b/test/py/meson.build
>> @@ -0,0 +1,17 @@
>> +# SPDX-License-Identifier: CC0-1.0
>> +
>> +if not pycamera_enabled
>> + subdir_done()
>> +endif
>> +
>> +pymod = import('python')
>> +py3 = pymod.find_installation('python3')
>> +
>> +pypathdir = meson.project_build_root() / 'src/py'
>
> pypathdir = meson.project_build_root() / 'src' / 'py'
>
>> +
>> +test('pyunittests',
>> + py3,
>> + args : files('unittests.py'),
>> + env : ['PYTHONPATH=' + pypathdir],
>> + suite : 'pybindings',
>> + is_parallel : false)
>> diff --git a/test/py/unittests.py b/test/py/unittests.py
>> new file mode 100755
>> index 00000000..15d5b4a7
>> --- /dev/null
>> +++ b/test/py/unittests.py
>> @@ -0,0 +1,368 @@
>> +#!/usr/bin/env python3
>> +
>> +# SPDX-License-Identifier: GPL-2.0-or-later
>> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen at ideasonboard.com>
>> +
>> +from collections import defaultdict
>> +import errno
>> +import gc
>> +import libcamera as libcam
>
> I'm tempted to claim the "cam" name in all our Python code, or possibly
> "camera". What do you think ?
I think 'cam' and 'camera' are names for variables that contain a camera.
>> +import os
>> +import selectors
>> +import time
>> +import unittest
>> +import weakref
>> +
>> +
>> +class MyTestCase(unittest.TestCase):
>
> s/MyTestCase/BaseTestCase/
>
> It's not yours only anymore :-)
My precious...
>> + def assertZero(self, a, msg=None):
>> + self.assertEqual(a, 0, msg)
>> +
>> +
>> +class SimpleTestMethods(MyTestCase):
>> + def test_find_ref(self):
>> + cm = libcam.CameraManager.singleton()
>> + wr_cm = weakref.ref(cm)
>> +
>> + cam = cm.find("platform/vimc")
>
> Standardizing on single-quotes here too would be nice.
I can't stand them, but... Fine, it's probably best to follow the widely
used convention.
>> + self.assertIsNotNone(cam)
>> + wr_cam = weakref.ref(cam)
>> +
>> + cm = None
>> + gc.collect()
>> + self.assertIsNotNone(wr_cm())
>> +
>> + cam = None
>> + gc.collect()
>> + self.assertIsNone(wr_cm())
>> + self.assertIsNone(wr_cam())
>> +
>> + def test_get_ref(self):
>> + cm = libcam.CameraManager.singleton()
>> + wr_cm = weakref.ref(cm)
>> +
>> + cam = cm.get("platform/vimc.0 Sensor B")
>> + self.assertTrue(cam is not None)
>> + wr_cam = weakref.ref(cam)
>> +
>> + cm = None
>> + gc.collect()
>> + self.assertIsNotNone(wr_cm())
>> +
>> + cam = None
>> + gc.collect()
>> + self.assertIsNone(wr_cm())
>> + self.assertIsNone(wr_cam())
>> +
>> + def test_acquire_release(self):
>> + cm = libcam.CameraManager.singleton()
>> + cam = cm.get("platform/vimc.0 Sensor B")
>> + self.assertTrue(cam is not None)
>> +
>> + ret = cam.acquire()
>> + self.assertZero(ret)
>> +
>> + ret = cam.release()
>> + self.assertZero(ret)
>> +
>> + def test_double_acquire(self):
>> + cm = libcam.CameraManager.singleton()
>> + cam = cm.get("platform/vimc.0 Sensor B")
>> + self.assertTrue(cam is not None)
>> +
>> + ret = cam.acquire()
>> + self.assertZero(ret)
>> +
>> + libcam.logSetLevel("Camera", "FATAL")
>> + ret = cam.acquire()
>> + self.assertEqual(ret, -errno.EBUSY)
>> + libcam.logSetLevel("Camera", "ERROR")
>> +
>> + ret = cam.release()
>> + self.assertZero(ret)
>> +
>> + ret = cam.release()
>> + # I expected EBUSY, but looks like double release works fine
>> + self.assertZero(ret)
>> +
>> +
>> +class CameraTesterBase(MyTestCase):
>> + def setUp(self):
>> + self.cm = libcam.CameraManager.singleton()
>> + self.cam = self.cm.find("platform/vimc")
>> + if self.cam is None:
>> + self.cm = None
>> + raise Exception("No vimc found")
>
> Is it possible to skip the test in that case instead of failing ? This
> could be done on top, a todo comment somewhere in the file would be good
> then.
Yes, I can do that. Although some tests test finding the vimc camera, so
those will still fail.
>> +
>> + ret = self.cam.acquire()
>> + if ret != 0:
>> + self.cam = None
>> + self.cm = None
>> + raise Exception("Failed to acquire camera")
>> +
>> + def tearDown(self):
>> + # If a test fails, the camera may be in running state. So always stop.
>> + self.cam.stop()
>> +
>> + ret = self.cam.release()
>> + if ret != 0:
>> + raise Exception("Failed to release camera")
>> +
>> + self.cam = None
>> + self.cm = None
>> +
>> +
>> +class AllocatorTestMethods(CameraTesterBase):
>> + def test_allocator(self):
>> + cam = self.cam
>> +
>> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
>> + self.assertTrue(camconfig.size == 1)
>> + wr_camconfig = weakref.ref(camconfig)
>> +
>> + streamconfig = camconfig.at(0)
>> + wr_streamconfig = weakref.ref(streamconfig)
>> +
>> + ret = cam.configure(camconfig)
>> + self.assertZero(ret)
>> +
>> + stream = streamconfig.stream
>> + wr_stream = weakref.ref(stream)
>> +
>> + # stream should keep streamconfig and camconfig alive
>> + streamconfig = None
>> + camconfig = None
>> + gc.collect()
>> + self.assertIsNotNone(wr_camconfig())
>> + self.assertIsNotNone(wr_streamconfig())
>> +
>> + allocator = libcam.FrameBufferAllocator(cam)
>> + ret = allocator.allocate(stream)
>> + self.assertTrue(ret > 0)
>> + wr_allocator = weakref.ref(allocator)
>> +
>> + buffers = allocator.buffers(stream)
>> + buffers = None
>> +
>> + buffer = allocator.buffers(stream)[0]
>> + self.assertIsNotNone(buffer)
>> + wr_buffer = weakref.ref(buffer)
>> +
>> + allocator = None
>> + gc.collect()
>> + self.assertIsNotNone(wr_buffer())
>> + self.assertIsNotNone(wr_allocator())
>> + self.assertIsNotNone(wr_stream())
>> +
>> + buffer = None
>> + gc.collect()
>> + self.assertIsNone(wr_buffer())
>> + self.assertIsNone(wr_allocator())
>> + self.assertIsNotNone(wr_stream())
>> +
>> + stream = None
>> + gc.collect()
>> + self.assertIsNone(wr_stream())
>> + self.assertIsNone(wr_camconfig())
>> + self.assertIsNone(wr_streamconfig())
>> +
>> +
>> +class SimpleCaptureMethods(CameraTesterBase):
>> + def test_sleep(self):
>> + cm = self.cm
>> + cam = self.cam
>> +
>> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
>> + self.assertTrue(camconfig.size == 1)
>> +
>> + streamconfig = camconfig.at(0)
>> + fmts = streamconfig.formats
>> +
>> + ret = cam.configure(camconfig)
>> + self.assertZero(ret)
>> +
>> + stream = streamconfig.stream
>> +
>> + allocator = libcam.FrameBufferAllocator(cam)
>> + ret = allocator.allocate(stream)
>> + self.assertTrue(ret > 0)
>> +
>> + num_bufs = len(allocator.buffers(stream))
>> +
>> + reqs = []
>> + for i in range(num_bufs):
>> + req = cam.createRequest(i)
>> + self.assertIsNotNone(req)
>> +
>> + buffer = allocator.buffers(stream)[i]
>> + ret = req.addBuffer(stream, buffer)
>> + self.assertZero(ret)
>> +
>> + reqs.append(req)
>> +
>> + buffer = None
>> +
>> + ret = cam.start()
>> + self.assertZero(ret)
>> +
>> + for req in reqs:
>> + ret = cam.queueRequest(req)
>> + self.assertZero(ret)
>> +
>> + reqs = None
>> + gc.collect()
>> +
>> + time.sleep(0.5)
>> +
>> + reqs = cm.getReadyRequests()
>> +
>> + self.assertTrue(len(reqs) == num_bufs)
>> +
>> + for i, req in enumerate(reqs):
>> + self.assertTrue(i == req.cookie)
>> +
>> + reqs = None
>> + gc.collect()
>> +
>> + ret = cam.stop()
>> + self.assertZero(ret)
>> +
>> + def test_select(self):
>> + cm = self.cm
>> + cam = self.cam
>> +
>> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
>> + self.assertTrue(camconfig.size == 1)
>> +
>> + streamconfig = camconfig.at(0)
>> + fmts = streamconfig.formats
>> +
>> + ret = cam.configure(camconfig)
>> + self.assertZero(ret)
>> +
>> + stream = streamconfig.stream
>> +
>> + allocator = libcam.FrameBufferAllocator(cam)
>> + ret = allocator.allocate(stream)
>> + self.assertTrue(ret > 0)
>> +
>> + num_bufs = len(allocator.buffers(stream))
>> +
>> + reqs = []
>> + for i in range(num_bufs):
>> + req = cam.createRequest(i)
>> + self.assertIsNotNone(req)
>> +
>> + buffer = allocator.buffers(stream)[i]
>> + ret = req.addBuffer(stream, buffer)
>> + self.assertZero(ret)
>> +
>> + reqs.append(req)
>> +
>> + buffer = None
>> +
>> + ret = cam.start()
>> + self.assertZero(ret)
>> +
>> + for req in reqs:
>> + ret = cam.queueRequest(req)
>> + self.assertZero(ret)
>> +
>> + reqs = None
>> + gc.collect()
>> +
>> + sel = selectors.DefaultSelector()
>> + sel.register(cm.efd, selectors.EVENT_READ, 123)
>
> Is the data argument needed ?
No, it's not.
>> +
>> + reqs = []
>> +
>> + running = True
>> + while running:
>> + events = sel.select()
>> + for key, mask in events:
>> + os.read(key.fileobj, 8)
>> +
>> + ready_reqs = cm.getReadyRequests()
>> +
>> + self.assertTrue(len(ready_reqs) > 0)
>> +
>> + reqs += ready_reqs
>> +
>> + if len(reqs) == num_bufs:
>> + running = False
>> +
>> + self.assertTrue(len(reqs) == num_bufs)
>> +
>> + for i, req in enumerate(reqs):
>> + self.assertTrue(i == req.cookie)
>> +
>> + reqs = None
>> + gc.collect()
>> +
>> + ret = cam.stop()
>> + self.assertZero(ret)
>> +
>> +
>> +# Recursively expand slist's objects into olist, using seen to track already
>> +# processed objects.
>> +def _getr(slist, olist, seen):
>> + for e in slist:
>> + if id(e) in seen:
>> + continue
>> + seen.add(id(e))
>> + olist.append(e)
>> + tl = gc.get_referents(e)
>> + if tl:
>> + _getr(tl, olist, seen)
>> +
>> +
>> +def get_all_objects(ignored=[]):
>> + gcl = gc.get_objects()
>> + olist = []
>> + seen = set()
>> +
>> + seen.add(id(gcl))
>> + seen.add(id(olist))
>> + seen.add(id(seen))
>> + seen.update(set([id(o) for o in ignored]))
>> +
>> + _getr(gcl, olist, seen)
>> +
>> + return olist
>> +
>> +
>> +def create_type_count_map(olist):
>> + map = defaultdict(int)
>> + for o in olist:
>> + map[type(o)] += 1
>> + return map
>> +
>> +
>> +def diff_type_count_maps(before, after):
>> + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
>> +
>> +
>> +if __name__ == '__main__':
>> + # doesn't work very well, as things always leak a bit
>
> Lovely :-) Is it something we can fix ?
I have no idea if this method is a valid way to observe object leaks,
so... no clue. I think it can still be used, but manually, comparing
object lists when doing some changes to the tests.
Tomi
More information about the libcamera-devel
mailing list