# # BitBake Test for lib/bb/persist_data/ # # Copyright (C) 2018 Garmin Ltd. # # SPDX-License-Identifier: GPL-2.0-only # import unittest import bb.data import bb.persist_data import tempfile import threading class PersistDataTest(unittest.TestCase): def _create_data(self): return bb.persist_data.persist('TEST_PERSIST_DATA', self.d) def setUp(self): self.d = bb.data.init() self.tempdir = tempfile.TemporaryDirectory() self.d['PERSISTENT_DIR'] = self.tempdir.name self.data = self._create_data() self.items = { 'A1': '1', 'B1': '2', 'C2': '3' } self.stress_count = 10000 self.thread_count = 5 for k,v in self.items.items(): self.data[k] = v def tearDown(self): self.tempdir.cleanup() def _iter_helper(self, seen, iterator): with iter(iterator): for v in iterator: self.assertTrue(v in seen) seen.remove(v) self.assertEqual(len(seen), 0, '%s not seen' % seen) def test_get(self): for k, v in self.items.items(): self.assertEqual(self.data[k], v) self.assertIsNone(self.data.get('D')) with self.assertRaises(KeyError): self.data['D'] def test_set(self): for k, v in self.items.items(): self.data[k] += '-foo' for k, v in self.items.items(): self.assertEqual(self.data[k], v + '-foo') def test_delete(self): self.data['D'] = '4' self.assertEqual(self.data['D'], '4') del self.data['D'] self.assertIsNone(self.data.get('D')) with self.assertRaises(KeyError): self.data['D'] def test_contains(self): for k in self.items: self.assertTrue(k in self.data) self.assertTrue(self.data.has_key(k)) self.assertFalse('NotFound' in self.data) self.assertFalse(self.data.has_key('NotFound')) def test_len(self): self.assertEqual(len(self.data), len(self.items)) def test_iter(self): self._iter_helper(set(self.items.keys()), self.data) def test_itervalues(self): self._iter_helper(set(self.items.values()), self.data.itervalues()) def test_iteritems(self): self._iter_helper(set(self.items.items()), self.data.iteritems()) def test_get_by_pattern(self): self._iter_helper({'1', '2'}, self.data.get_by_pattern('_1')) def _stress_read(self, data): for i in range(self.stress_count): for k in self.items: data[k] def _stress_write(self, data): for i in range(self.stress_count): for k, v in self.items.items(): data[k] = v + str(i) def _validate_stress(self): for k, v in self.items.items(): self.assertEqual(self.data[k], v + str(self.stress_count - 1)) def test_stress(self): self._stress_read(self.data) self._stress_write(self.data) self._validate_stress() def test_stress_threads(self): def read_thread(): data = self._create_data() self._stress_read(data) def write_thread(): data = self._create_data() self._stress_write(data) threads = [] for i in range(self.thread_count): threads.append(threading.Thread(target=read_thread)) threads.append(threading.Thread(target=write_thread)) for t in threads: t.start() self._stress_read(self.data) for t in threads: t.join() self._validate_stress()