Subversion Revision: 112014 diff --git a/Tools/ChangeLog b/Tools/ChangeLog index 3825729a6e1bc16183662e47e23cec752fa86cef..db5a45089b7b6149cfd807eead3b461e2c212c0f 100644 --- a/Tools/ChangeLog +++ b/Tools/ChangeLog @@ -1,5 +1,40 @@ 2012-03-24 Dirk Pranke + nrwt: simplify worker/broker interface + https://bugs.webkit.org/show_bug.cgi?id=82137 + + Reviewed by NOBODY (OOPS!). + + This change pushes the post_message() logic from worker.py into + manager_worker_broker, so that we can finish hiding the + message-passing abstraction from the manager and workers. The + handle to the WorkerConnection is now called a 'caller', so we + can encapsulate the idea of a message broker from the worker; + now you just notify the caller of things. + + * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py: + (_BrokerConnection.run_message_loop): + (_WorkerConnection.is_done): + (_WorkerConnection.handle_stop): + (_WorkerConnection): + (_WorkerConnection.handle_test_list): + (_WorkerConnection.post_done): + (_WorkerConnection.post_finished_list): + (_WorkerConnection.post_started_test): + (_WorkerConnection.post_finished_test): + * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: + (_TestWorker.run_test_list): + (_TestWorker.run): + (_TestsMixin.test_cancel): + (_TestsMixin.test_done): + (InlineBrokerTests.test_inline_arguments): + * Scripts/webkitpy/layout_tests/controllers/worker.py: + (Worker.run): + (Worker.run_test_list): + (Worker._run_test): + +2012-03-24 Dirk Pranke + nrwt: simplify manager/broker interface https://bugs.webkit.org/show_bug.cgi?id=82116 diff --git a/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py b/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py index 8ac293afa457b068fd9be8c5746b78cef705b382..547bf8a21b4ee3dc019a107fe0f726efb2150a34 100755 --- a/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py +++ b/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py @@ -250,7 +250,7 @@ class _BrokerConnection(object): broker.add_topic(post_topic) def run_message_loop(self, delay_secs=None): - self._broker.run_message_loop(self._run_topic, self._client, delay_secs) + self._broker.run_message_loop(self._run_topic, self, delay_secs) def post_message(self, message_name, *message_args): self._broker.post_message(self._client, self._post_topic, @@ -265,7 +265,7 @@ class _BrokerConnection(object): class AbstractWorker(BrokerClient): - def __init__(self, worker_connection, worker_arguments=None): + def __init__(self, caller, worker_arguments=None): """The constructor should be used to do any simple initialization necessary, but should not do anything that creates data structures that cannot be Pickled or sent across processes (like opening @@ -273,11 +273,11 @@ class AbstractWorker(BrokerClient): start of the run() call. Args: - worker_connection - handle to the _BrokerConnection object creating + caller - handle to the calling _BrokerConnection object creating the worker and that can be used for messaging. worker_arguments - (optional, Picklable) object passed to the worker from the manager""" BrokerClient.__init__(self) - self._worker_connection = worker_connection + self._caller = caller self._name = 'worker' self._done = False self._canceled = False @@ -294,19 +294,22 @@ class AbstractWorker(BrokerClient): exception_msg = "" try: - self._worker_connection.run_message_loop() + self._caller.run_message_loop() if not self.is_done(): raise AssertionError("%s: ran out of messages in worker queue." % self._name) except KeyboardInterrupt: exception_msg = ", interrupted" - self._worker_connection.raise_exception(sys.exc_info()) + self._caller.raise_exception(sys.exc_info()) except: exception_msg = ", exception raised" - self._worker_connection.raise_exception(sys.exc_info()) + self._caller.raise_exception(sys.exc_info()) finally: _log.debug("%s done with message loop%s" % (self._name, exception_msg)) + def yield_to_caller(self): + self._caller.yield_to_caller() + def cancel(self): """Called when possible to indicate to the worker to stop processing messages and shut down. Note that workers may be stopped without this @@ -520,15 +523,36 @@ class _WorkerConnection(_BrokerConnection): def cancel(self): raise NotImplementedError + def is_done(self): + return self._client.is_done() + def is_alive(self): raise NotImplementedError def join(self, timeout): raise NotImplementedError - def yield_to_broker(self): + def yield_to_caller(self): pass + def handle_stop(self, src): + self._client.stop_handling_messages() + + def handle_test_list(self, src, list_name, test_list): + self._client.run_test_list(list_name, test_list) + + def notify_done(self): + self.post_message('done') + + def notify_finished_list(self, list_name, num_tests, elapsed_time): + self.post_message('finished_list', list_name, num_tests, elapsed_time) + + def notify_started_test(self, test_input, test_timeout_sec): + self.post_message('started_test', test_input, test_timeout_sec) + + def notify_finished_test(self, test_results, elapsed_time): + self.post_message('finished_test', test_results, elapsed_time) + class _InlineWorkerConnection(_WorkerConnection): def __init__(self, broker, manager_client, worker_class, worker_arguments): @@ -555,7 +579,7 @@ class _InlineWorkerConnection(_WorkerConnection): finally: self._alive = False - def yield_to_broker(self): + def yield_to_caller(self): self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client) def raise_exception(self, exc_info): diff --git a/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py b/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py index d08aa941b0726568df4c7ce5d4f65cf3e0327b29..9d5b53cdfe6cae23c2e0fd28ae2012122bdafdfb 100644 --- a/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py +++ b/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py @@ -58,8 +58,8 @@ def make_broker(manager, max_workers, start_queue=None, stop_queue=None): class _TestWorker(manager_worker_broker.AbstractWorker): - def __init__(self, worker_connection, worker_arguments=None): - super(_TestWorker, self).__init__(worker_connection) + def __init__(self, caller, worker_arguments=None): + super(_TestWorker, self).__init__(caller) self._name = WORKER_NAME self._elapsed_time = 1.0 self._starting_queue = starting_queue @@ -71,13 +71,10 @@ class _TestWorker(manager_worker_broker.AbstractWorker): def set_inline_arguments(self, elapsed_time): self._elapsed_time = elapsed_time - def handle_stop(self, src): - self.stop_handling_messages() - - def handle_test(self, src, an_int, a_str): + def run_test_list(self, an_int, a_str): assert an_int == 1 assert a_str == "hello, world" - self._worker_connection.post_message('finished_test', 2, self._elapsed_time) + self._caller.notify_finished_test(2, self._elapsed_time) def run(self): if self._starting_queue: @@ -88,7 +85,7 @@ class _TestWorker(manager_worker_broker.AbstractWorker): try: super(_TestWorker, self).run() finally: - self._worker_connection.post_message('done') + self._caller.notify_done() class FunctionTests(unittest.TestCase): @@ -135,14 +132,14 @@ class _TestsMixin(object): self.make_broker() worker = self._broker.start_worker(WorkerArguments(1)) worker.cancel() - self._broker.post_message('test', 1, 'hello, world') + self._broker.post_message('test_list', 1, 'hello, world') worker.join(0.1) self.assertFalse(worker.is_alive()) def test_done(self): self.make_broker() worker = self._broker.start_worker(WorkerArguments(1)) - self._broker.post_message('test', 1, 'hello, world') + self._broker.post_message('test_list', 1, 'hello, world') self._broker.post_message('stop') self._broker.run_message_loop() worker.join(0.5) @@ -177,7 +174,7 @@ class InlineBrokerTests(_TestsMixin, unittest.TestCase): self.make_broker() worker = self._broker.start_worker(WorkerArguments(1)) worker.set_inline_arguments(2.0) - self._broker.post_message('test', 1, 'hello, world') + self._broker.post_message('test_list', 1, 'hello, world') self._broker.post_message('stop') self._broker.run_message_loop() self.assertEquals(self._elapsed_time, 2.0) diff --git a/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py b/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py index a419dd4b918be4498c913146526f0ce4dee3f0f1..2de1a9a3a2d901a3160185e58af9e5f3391500c1 100644 --- a/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py +++ b/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py @@ -48,8 +48,8 @@ WorkerArguments = manager_worker_broker.WorkerArguments class Worker(manager_worker_broker.AbstractWorker): - def __init__(self, worker_connection, worker_arguments): - super(Worker, self).__init__(worker_connection, worker_arguments) + def __init__(self, caller, worker_arguments): + super(Worker, self).__init__(caller, worker_arguments) self._worker_number = worker_arguments.worker_number self._name = 'worker/%d' % self._worker_number self._results_directory = worker_arguments.results_directory @@ -130,11 +130,11 @@ class Worker(manager_worker_broker.AbstractWorker): super(Worker, self).run() finally: self.kill_driver() - self._worker_connection.post_message('done') + self._caller.notify_done() _log.debug("%s exiting" % self._name) self.cleanup() - def handle_test_list(self, src, list_name, test_list): + def run_test_list(self, list_name, test_list): start_time = time.time() num_tests = 0 for test_input in test_list: @@ -143,23 +143,20 @@ class Worker(manager_worker_broker.AbstractWorker): test_input.should_run_pixel_test = (self._port.expected_image(test_input.test_name) != None) self._run_test(test_input) num_tests += 1 - self._worker_connection.yield_to_broker() + self.yield_to_caller() elapsed_time = time.time() - start_time - self._worker_connection.post_message('finished_list', list_name, num_tests, elapsed_time) - - def handle_stop(self, src): - self.stop_handling_messages() + self._caller.notify_finished_list(list_name, num_tests, elapsed_time) def _run_test(self, test_input): test_timeout_sec = self.timeout(test_input) start = time.time() - self._worker_connection.post_message('started_test', test_input, test_timeout_sec) + self._caller.notify_started_test(test_input, test_timeout_sec) result = self.run_test_with_timeout(test_input, test_timeout_sec) elapsed_time = time.time() - start - self._worker_connection.post_message('finished_test', result, elapsed_time) + self._caller.notify_finished_test(result, elapsed_time) self.clean_up_after_test(test_input, result)