diff --git a/README.md b/README.md index d0507bf..7d9cfd9 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,11 @@ pytest --tests-per-worker auto pytest --workers 2 --tests-per-worker auto ``` +## Non parallel runs + +Use `@pytest.mark.sequence` to mark tests, that shouldn't run in parallel mode. +Use fixture `sequence` to mark fixtures or tests, that shouldn't run in parallel mode. + ## Notice Beginning with Python 3.8, forking behavior is forced on macOS at the expense of safety. diff --git a/pytest_parallel/__init__.py b/pytest_parallel/__init__.py index e584800..e364e45 100644 --- a/pytest_parallel/__init__.py +++ b/pytest_parallel/__init__.py @@ -103,6 +103,15 @@ def run(self): pass +@pytest.fixture +def sequence(): + """ + Fixture moves test to sequence execution. + Fixture can be used with another fixture. + """ + pass + + @pytest.mark.trylast def pytest_configure(config): workers = parse_config(config, 'workers') @@ -110,6 +119,8 @@ def pytest_configure(config): if not config.option.collectonly and (workers or tests_per_worker): config.pluginmanager.register(ParallelRunner(config), 'parallelrunner') + config.addinivalue_line("markers", "sequence: mark non parallel tests") + class ThreadLocalEnviron(os._Environ): def __init__(self, env): @@ -189,6 +200,31 @@ def __init__(self, *args, **kwargs): super(ThreadLocalFixtureDef, self).__init__(*args, **kwargs) +def print_info(workers, tests_per_worker, parallel_queue_size, sequence_queue_size): + if workers > 1: + worker_noun, process_noun = ('workers', 'processes') + else: + worker_noun, process_noun = ('worker', 'process') + + if tests_per_worker > 1: + test_noun, thread_noun = ('tests', 'threads') + else: + test_noun, thread_noun = ('test', 'thread') + + print( + 'pytest-parallel: {} {} ({}), {} {} per worker ({})'.format( + workers, worker_noun, process_noun, tests_per_worker, test_noun, thread_noun, + ) + ) + + if sequence_queue_size: + print( + '{} tests will run parallel, {} tests will run in sequence.'.format( + parallel_queue_size, sequence_queue_size, + ) + ) + + class ParallelRunner(object): def __init__(self, config): self._config = config @@ -257,22 +293,9 @@ def pytest_runtestloop(self, session): raise ValueError(('tests_per_worker can only be ' 'an integer or "auto"')) - if self.workers > 1: - worker_noun, process_noun = ('workers', 'processes') - else: - worker_noun, process_noun = ('worker', 'process') - - if tests_per_worker > 1: - test_noun, thread_noun = ('tests', 'threads') - else: - test_noun, thread_noun = ('test', 'thread') - - print('pytest-parallel: {} {} ({}), {} {} per worker ({})' - .format(self.workers, worker_noun, process_noun, - tests_per_worker, test_noun, thread_noun)) - queue_cls = self._manager.Queue - queue = queue_cls() + parallel_queue = queue_cls() + sequence_queue = queue_cls() errors = queue_cls() # Reports about tests will be gathered from workerss @@ -282,13 +305,20 @@ def pytest_runtestloop(self, session): # This way, report generators like JUnitXML will work as expected. self.responses_queue = queue_cls() - for i in range(len(session.items)): - queue.put(i) + for i, item in enumerate(session.items): + if "sequence" in [mark.name for mark in item.own_markers] or "sequence" in item._fixtureinfo.names_closure: + sequence_queue.put(i) + else: + parallel_queue.put(i) + + print_info(self.workers, tests_per_worker, parallel_queue.qsize(), sequence_queue.qsize()) # Now we need to put stopping sentinels, so that worker # processes will know, there is time to finish the work. for i in range(self.workers * tests_per_worker): - queue.put('stop') + parallel_queue.put('stop') + + sequence_queue.put('stop') responses_processor = threading.Thread( target=self.process_responses, @@ -307,7 +337,7 @@ def wait_for_responses_processor(): # This flag will be changed after the worker's fork. self._config.parallel_worker = False - args = (self._config, queue, session, tests_per_worker, errors) + args = (self._config, parallel_queue, session, tests_per_worker, errors) for _ in range(self.workers): process = Process(target=process_with_threads, args=args) process.start() @@ -315,7 +345,13 @@ def wait_for_responses_processor(): [p.join() for p in processes] - queue.join() + parallel_queue.join() + + thread_for_sequence = ThreadWorker(sequence_queue, session, errors) + thread_for_sequence.start() + thread_for_sequence.join() + sequence_queue.join() + wait_for_responses_processor() if not errors.empty(): diff --git a/tests/test_general.py b/tests/test_general.py index aa7ec46..0c531b3 100644 --- a/tests/test_general.py +++ b/tests/test_general.py @@ -237,7 +237,7 @@ def test_1(): raise Exception('Failed to load test file') """) result = testdir.runpytest(*cli_args) - result.assert_outcomes(error=1) + result.assert_outcomes(errors=1) # Expect error code 2 (Interrupted), which is returned on collection error. assert result.ret == 2 @@ -258,3 +258,37 @@ def test_collection_collectonly(testdir, cli_args): ]) result.assert_outcomes() assert result.ret == 0 + + +@pytest.mark.parametrize('cli_args', [ + ['--workers=4'], + ['--tests-per-worker=4'] +]) +def test_sequenced_call(testdir, cli_args): + """ + Tests, that sequenced tests will be called in sequence. + """ + testdir.makepyfile(""" + import pytest + import time + + obj = {"a": None} + + @pytest.mark.sequence + def test_1(): + time.sleep(0.1) + assert obj["a"] == None + obj["a"] = "first" + + @pytest.mark.sequence + def test_2(): + assert obj["a"] == "first" + obj["a"] = "second" + + @pytest.mark.sequence + def test_3(): + assert obj["a"] == "second" + """) + result = testdir.runpytest(*cli_args) + result.assert_outcomes(passed=3) + assert result.ret == 0