aboutsummaryrefslogtreecommitdiffstats
path: root/lib/bb/tests/persist_data.py
blob: f641b5acbc1725b91f10948831d1501749a60451 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#
# BitBake Test for lib/bb/persist_data/
#
# Copyright (C) 2018 Garmin Ltd.
#
# SPDX-License-Identifier: GPL-2.0-only
#

import unittest
import bb.data
import bb.persist_data
import tempfile
import threading

class PersistDataTest(unittest.TestCase):
    def _create_data(self):
        return bb.persist_data.persist('TEST_PERSIST_DATA', self.d)

    def setUp(self):
        self.d = bb.data.init()
        self.tempdir = tempfile.TemporaryDirectory()
        self.d['PERSISTENT_DIR'] = self.tempdir.name
        self.data = self._create_data()
        self.items = {
                'A1': '1',
                'B1': '2',
                'C2': '3'
                }
        self.stress_count = 10000
        self.thread_count = 5

        for k,v in self.items.items():
            self.data[k] = v

    def tearDown(self):
        self.tempdir.cleanup()

    def _iter_helper(self, seen, iterator):
        with iter(iterator):
            for v in iterator:
                self.assertTrue(v in seen)
                seen.remove(v)
        self.assertEqual(len(seen), 0, '%s not seen' % seen)

    def test_get(self):
        for k, v in self.items.items():
            self.assertEqual(self.data[k], v)

        self.assertIsNone(self.data.get('D'))
        with self.assertRaises(KeyError):
            self.data['D']

    def test_set(self):
        for k, v in self.items.items():
            self.data[k] += '-foo'

        for k, v in self.items.items():
            self.assertEqual(self.data[k], v + '-foo')

    def test_delete(self):
        self.data['D'] = '4'
        self.assertEqual(self.data['D'], '4')
        del self.data['D']
        self.assertIsNone(self.data.get('D'))
        with self.assertRaises(KeyError):
            self.data['D']

    def test_contains(self):
        for k in self.items:
            self.assertTrue(k in self.data)
            self.assertTrue(self.data.has_key(k))
        self.assertFalse('NotFound' in self.data)
        self.assertFalse(self.data.has_key('NotFound'))

    def test_len(self):
        self.assertEqual(len(self.data), len(self.items))

    def test_iter(self):
        self._iter_helper(set(self.items.keys()), self.data)

    def test_itervalues(self):
        self._iter_helper(set(self.items.values()), self.data.itervalues())

    def test_iteritems(self):
        self._iter_helper(set(self.items.items()), self.data.iteritems())

    def test_get_by_pattern(self):
        self._iter_helper({'1', '2'}, self.data.get_by_pattern('_1'))

    def _stress_read(self, data):
        for i in range(self.stress_count):
            for k in self.items:
                data[k]

    def _stress_write(self, data):
        for i in range(self.stress_count):
            for k, v in self.items.items():
                data[k] = v + str(i)

    def _validate_stress(self):
        for k, v in self.items.items():
            self.assertEqual(self.data[k], v + str(self.stress_count - 1))

    def test_stress(self):
        self._stress_read(self.data)
        self._stress_write(self.data)
        self._validate_stress()

    def test_stress_threads(self):
        def read_thread():
            data = self._create_data()
            self._stress_read(data)

        def write_thread():
            data = self._create_data()
            self._stress_write(data)

        threads = []
        for i in range(self.thread_count):
            threads.append(threading.Thread(target=read_thread))
            threads.append(threading.Thread(target=write_thread))

        for t in threads:
            t.start()
        self._stress_read(self.data)
        for t in threads:
            t.join()
        self._validate_stress()