# ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # BitBake Tests for the Event implementation (event.py) # # Copyright (C) 2017 Intel Corporation # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import unittest import bb import logging import bb.compat import bb.event import importlib import threading import time import pickle from unittest.mock import Mock from unittest.mock import call from bb.msg import BBLogFormatter class EventQueueStubBase(object): """ Base class for EventQueueStub classes """ def __init__(self): self.event_calls = [] return def _store_event_data_string(self, event): if isinstance(event, logging.LogRecord): formatter = BBLogFormatter("%(levelname)s: %(message)s") self.event_calls.append(formatter.format(event)) else: self.event_calls.append(bb.event.getName(event)) return class EventQueueStub(EventQueueStubBase): """ Class used as specification for UI event handler queue stub objects """ def __init__(self): super(EventQueueStub, self).__init__() def send(self, event): super(EventQueueStub, self)._store_event_data_string(event) class PickleEventQueueStub(EventQueueStubBase): """ Class used as specification for UI event handler queue stub objects with sendpickle method """ def __init__(self): super(PickleEventQueueStub, self).__init__() def sendpickle(self, pickled_event): event = pickle.loads(pickled_event) super(PickleEventQueueStub, self)._store_event_data_string(event) class UIClientStub(object): """ Class used as specification for UI event handler stub objects """ def __init__(self): self.event = None class EventHandlingTest(unittest.TestCase): """ Event handling test class """ def setUp(self): self._test_process = Mock() ui_client1 = UIClientStub() ui_client2 = UIClientStub() self._test_ui1 = Mock(wraps=ui_client1) self._test_ui2 = Mock(wraps=ui_client2) importlib.reload(bb.event) def _create_test_handlers(self): """ Method used to create a test handler ordered dictionary """ test_handlers = bb.compat.OrderedDict() test_handlers["handler1"] = self._test_process.handler1 test_handlers["handler2"] = self._test_process.handler2 return test_handlers def test_class_handlers(self): """ Test set_class_handlers and get_class_handlers methods """ test_handlers = self._create_test_handlers() bb.event.set_class_handlers(test_handlers) self.assertEqual(test_handlers, bb.event.get_class_handlers()) def test_handlers(self): """ Test set_handlers and get_handlers """ test_handlers = self._create_test_handlers() bb.event.set_handlers(test_handlers) self.assertEqual(test_handlers, bb.event.get_handlers()) def test_clean_class_handlers(self): """ Test clean_class_handlers method """ cleanDict = bb.compat.OrderedDict() self.assertEqual(cleanDict, bb.event.clean_class_handlers()) def test_register(self): """ Test register method for class handlers """ result = bb.event.register("handler", self._test_process.handler) self.assertEqual(result, bb.event.Registered) handlers_dict = bb.event.get_class_handlers() self.assertIn("handler", handlers_dict) def test_already_registered(self): """ Test detection of an already registed class handler """ bb.event.register("handler", self._test_process.handler) handlers_dict = bb.event.get_class_handlers() self.assertIn("handler", handlers_dict) result = bb.event.register("handler", self._test_process.handler) self.assertEqual(result, bb.event.AlreadyRegistered) def test_register_from_string(self): """ Test register method receiving code in string """ result = bb.event.register("string_handler", " return True") self.assertEqual(result, bb.event.Registered) handlers_dict = bb.event.get_class_handlers() self.assertIn("string_handler", handlers_dict) def test_register_with_mask(self): """ Test register method with event masking """ mask = ["bb.event.OperationStarted", "bb.event.OperationCompleted"] result = bb.event.register("event_handler", self._test_process.event_handler, mask) self.assertEqual(result, bb.event.Registered) handlers_dict = bb.event.get_class_handlers() self.assertIn("event_handler", handlers_dict) def test_remove(self): """ Test remove method for class handlers """ test_handlers = self._create_test_handlers() bb.event.set_class_handlers(test_handlers) count = len(test_handlers) bb.event.remove("handler1", None) test_handlers = bb.event.get_class_handlers() self.assertEqual(len(test_handlers), count - 1) with self.assertRaises(KeyError): bb.event.remove("handler1", None) def test_execute_handler(self): """ Test execute_handler method for class handlers """ mask = ["bb.event.OperationProgress"] result = bb.event.register("event_handler", self._test_process.event_handler, mask) self.assertEqual(result, bb.event.Registered) event = bb.event.OperationProgress(current=10, total=100) bb.event.execute_handler("event_handler", self._test_process.event_handler, event, None) self._test_process.event_handler.assert_called_once_with(event) def test_fire_class_handlers(self): """ Test fire_class_handlers method """ mask = ["bb.event.OperationStarted"] result = bb.event.register("event_handler1", self._test_process.event_handler1, mask) self.assertEqual(result, bb.event.Registered) result = bb.event.register("event_handler2", self._test_process.event_handler2, "*") self.assertEqual(result, bb.event.Registered) event1 = bb.event.OperationStarted() event2 = bb.event.OperationCompleted(total=123) bb.event.fire_class_handlers(event1, None) bb.event.fire_class_handlers(event2, None) bb.event.fire_class_handlers(event2, None) expected_event_handler1 = [call(event1)] expected_event_handler2 = [call(event1), call(event2), call(event2)] self.assertEqual(self._test_process.event_handler1.call_args_list, expected_event_handler1) self.assertEqual(self._test_process.event_handler2.call_args_list, expected_event_handler2) def test_class_handler_filters(self): """ Test filters for class handlers """ mask = ["bb.event.OperationStarted"] result = bb.event.register("event_handler1", self._test_process.event_handler1, mask) self.assertEqual(result, bb.event.Registered) result = bb.event.register("event_handler2", self._test_process.event_handler2, "*") self.assertEqual(result, bb.event.Registered) bb.event.set_eventfilter( lambda name, handler, event, d : name == 'event_handler2' and bb.event.getName(event) == "OperationStarted") event1 = bb.event.OperationStarted() event2 = bb.event.OperationCompleted(total=123) bb.event.fire_class_handlers(event1, None) bb.event.fire_class_handlers(event2, None) bb.event.fire_class_handlers(event2, None) expected_event_handler1 = [] expected_event_handler2 = [call(event1)] self.assertEqual(self._test_process.event_handler1.call_args_list, expected_event_handler1) self.assertEqual(self._test_process.event_handler2.call_args_list, expected_event_handler2) def test_change_handler_event_mapping(self): """ Test changing the event mapping for class handlers """ event1 = bb.event.OperationStarted() event2 = bb.event.OperationCompleted(total=123) # register handler for all events result = bb.event.register("event_handler1", self._test_process.event_handler1, "*") self.assertEqual(result, bb.event.Registered) bb.event.fire_class_handlers(event1, None) bb.event.fire_class_handlers(event2, None) expected = [call(event1), call(event2)] self.assertEqual(self._test_process.event_handler1.call_args_list, expected) # unregister handler and register it only for OperationStarted result = bb.event.remove("event_handler1", self._test_process.event_handler1) mask = ["bb.event.OperationStarted"] result = bb.event.register("event_handler1", self._test_process.event_handler1, mask) self.assertEqual(result, bb.event.Registered) bb.event.fire_class_handlers(event1, None) bb.event.fire_class_handlers(event2, None) expected = [call(event1), call(event2), call(event1)] self.assertEqual(self._test_process.event_handler1.call_args_list, expected) # unregister handler and register it only for OperationCompleted result = bb.event.remove("event_handler1", self._test_process.event_handler1) mask = ["bb.event.OperationCompleted"] result = bb.event.register("event_handler1", self._test_process.event_handler1, mask) self.assertEqual(result, bb.event.Registered) bb.event.fire_class_handlers(event1, None) bb.event.fire_class_handlers(event2, None) expected = [call(event1), call(event2), call(event1), call(event2)] self.assertEqual(self._test_process.event_handler1.call_args_list, expected) def test_register_UIHhandler(self): """ Test register_UIHhandler method """ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) def test_UIHhandler_already_registered(self): """ Test registering an UIHhandler already existing """ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 2) def test_unregister_UIHhandler(self): """ Test unregister_UIHhandler method """ result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) result = bb.event.unregister_UIHhandler(1) self.assertIs(result, None) def test_fire_ui_handlers(self): """ Test fire_ui_handlers method """ self._test_ui1.event = Mock(spec_set=EventQueueStub) result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) self._test_ui2.event = Mock(spec_set=PickleEventQueueStub) result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) self.assertEqual(result, 2) event1 = bb.event.OperationStarted() bb.event.fire_ui_handlers(event1, None) expected = [call(event1)] self.assertEqual(self._test_ui1.event.send.call_args_list, expected) expected = [call(pickle.dumps(event1))] self.assertEqual(self._test_ui2.event.sendpickle.call_args_list, expected) def test_ui_handler_mask_filter(self): """ Test filters for UI handlers """ mask = ["bb.event.OperationStarted"] debug_domains = {} self._test_ui1.event = Mock(spec_set=EventQueueStub) result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask) self._test_ui2.event = Mock(spec_set=PickleEventQueueStub) result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask) event1 = bb.event.OperationStarted() event2 = bb.event.OperationCompleted(total=1) bb.event.fire_ui_handlers(event1, None) bb.event.fire_ui_handlers(event2, None) expected = [call(event1)] self.assertEqual(self._test_ui1.event.send.call_args_list, expected) expected = [call(pickle.dumps(event1))] self.assertEqual(self._test_ui2.event.sendpickle.call_args_list, expected) def test_ui_handler_log_filter(self): """ Test log filters for UI handlers """ mask = ["*"] debug_domains = {'BitBake.Foo': logging.WARNING} self._test_ui1.event = EventQueueStub() result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask) self._test_ui2.event = PickleEventQueueStub() result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask) event1 = bb.event.OperationStarted() bb.event.fire_ui_handlers(event1, None) # All events match event_log_handler = bb.event.LogHandler() logger = logging.getLogger("BitBake") logger.addHandler(event_log_handler) logger1 = logging.getLogger("BitBake.Foo") logger1.warning("Test warning LogRecord1") # Matches debug_domains level logger1.info("Test info LogRecord") # Filtered out logger2 = logging.getLogger("BitBake.Bar") logger2.error("Test error LogRecord") # Matches filter base level logger2.warning("Test warning LogRecord2") # Filtered out logger.removeHandler(event_log_handler) expected = ['OperationStarted', 'WARNING: Test warning LogRecord1', 'ERROR: Test error LogRecord'] self.assertEqual(self._test_ui1.event.event_calls, expected) self.assertEqual(self._test_ui2.event.event_calls, expected) def test_fire(self): """ Test fire method used to trigger class and ui event handlers """ mask = ["bb.event.ConfigParsed"] result = bb.event.register("event_handler1", self._test_process.event_handler1, mask) self._test_ui1.event = Mock(spec_set=EventQueueStub) result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) event1 = bb.event.ConfigParsed() bb.event.fire(event1, None) expected = [call(event1)] self.assertEqual(self._test_process.event_handler1.call_args_list, expected) self.assertEqual(self._test_ui1.event.send.call_args_list, expected) def test_fire_from_worker(self): """ Test fire_from_worker method """ self._test_ui1.event = Mock(spec_set=EventQueueStub) result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) event1 = bb.event.ConfigParsed() bb.event.fire_from_worker(event1, None) expected = [call(event1)] self.assertEqual(self._test_ui1.event.send.call_args_list, expected) def test_worker_fire(self): """ Test the triggering of bb.event.worker_fire callback """ bb.event.worker_fire = Mock() event = bb.event.Event() bb.event.fire(event, None) expected = [call(event, None)] self.assertEqual(bb.event.worker_fire.call_args_list, expected) def test_print_ui_queue(self): """ Test print_ui_queue method """ event1 = bb.event.OperationStarted() event2 = bb.event.OperationCompleted(total=123) bb.event.fire(event1, None) bb.event.fire(event2, None) event_log_handler = bb.event.LogHandler() logger = logging.getLogger("BitBake") logger.addHandler(event_log_handler) logger.info("Test info LogRecord") logger.warning("Test warning LogRecord") with self.assertLogs("BitBake", level="INFO") as cm: bb.event.print_ui_queue() logger.removeHandler(event_log_handler) self.assertEqual(cm.output, ["INFO:BitBake:Test info LogRecord", "WARNING:BitBake:Test warning LogRecord"]) def _set_threadlock_test_mockups(self): """ Create UI event handler mockups used in enable and disable threadlock tests """ def ui1_event_send(event): if type(event) is bb.event.ConfigParsed: self._threadlock_test_calls.append("w1_ui1") if type(event) is bb.event.OperationStarted: self._threadlock_test_calls.append("w2_ui1") time.sleep(2) def ui2_event_send(event): if type(event) is bb.event.ConfigParsed: self._threadlock_test_calls.append("w1_ui2") if type(event) is bb.event.OperationStarted: self._threadlock_test_calls.append("w2_ui2") time.sleep(2) self._threadlock_test_calls = [] self._test_ui1.event = EventQueueStub() self._test_ui1.event.send = ui1_event_send result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) self.assertEqual(result, 1) self._test_ui2.event = EventQueueStub() self._test_ui2.event.send = ui2_event_send result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) self.assertEqual(result, 2) def _set_and_run_threadlock_test_workers(self): """ Create and run the workers used to trigger events in enable and disable threadlock tests """ worker1 = threading.Thread(target=self._thread_lock_test_worker1) worker2 = threading.Thread(target=self._thread_lock_test_worker2) worker1.start() time.sleep(1) worker2.start() worker1.join() worker2.join() def _thread_lock_test_worker1(self): """ First worker used to fire the ConfigParsed event for enable and disable threadlocks tests """ bb.event.fire(bb.event.ConfigParsed(), None) def _thread_lock_test_worker2(self): """ Second worker used to fire the OperationStarted event for enable and disable threadlocks tests """ bb.event.fire(bb.event.OperationStarted(), None) def test_enable_threadlock(self): """ Test enable_threadlock method """ self._set_threadlock_test_mockups() bb.event.enable_threadlock() self._set_and_run_threadlock_test_workers() # Calls to UI handlers should be in order as all the registered # handlers for the event coming from the first worker should be # called before processing the event from the second worker. self.assertEqual(self._threadlock_test_calls, ["w1_ui1", "w1_ui2", "w2_ui1", "w2_ui2"]) def test_disable_threadlock(self): """ Test disable_threadlock method """ self._set_threadlock_test_mockups() bb.event.disable_threadlock() self._set_and_run_threadlock_test_workers() # Calls to UI handlers should be intertwined together. Thanks to the # delay in the registered handlers for the event coming from the first # worker, the event coming from the second worker starts being # processed before finishing handling the first worker event. self.assertEqual(self._threadlock_test_calls, ["w1_ui1", "w2_ui1", "w1_ui2", "w2_ui2"])