Skip to content
This repository was archived by the owner on May 29, 2024. It is now read-only.

Commit b03473d

Browse files
author
Kalinovsky, Konstantin
committed
Added mark for sequenced execution. Fixes #80
1 parent 5610f2f commit b03473d

File tree

2 files changed

+51
-20
lines changed

2 files changed

+51
-20
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ pytest --tests-per-worker auto
5353
pytest --workers 2 --tests-per-worker auto
5454
```
5555

56+
## Markers
57+
58+
Use `@pytest.mark.sequence`, to mark tests, that shouldn't run in parallel mode.
59+
5660
## Notice
5761

5862
Beginning with Python 3.8, forking behavior is forced on macOS at the expense of safety.

pytest_parallel/__init__.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ def pytest_configure(config):
110110
if not config.option.collectonly and (workers or tests_per_worker):
111111
config.pluginmanager.register(ParallelRunner(config), 'parallelrunner')
112112

113+
config.addinivalue_line("markers", "sequence: mark non parallel tests")
114+
113115

114116
class ThreadLocalEnviron(os._Environ):
115117
def __init__(self, env):
@@ -189,6 +191,31 @@ def __init__(self, *args, **kwargs):
189191
super(ThreadLocalFixtureDef, self).__init__(*args, **kwargs)
190192

191193

194+
def print_info(workers, tests_per_worker, parallel_queue_size, sequence_queue_size):
195+
if workers > 1:
196+
worker_noun, process_noun = ('workers', 'processes')
197+
else:
198+
worker_noun, process_noun = ('worker', 'process')
199+
200+
if tests_per_worker > 1:
201+
test_noun, thread_noun = ('tests', 'threads')
202+
else:
203+
test_noun, thread_noun = ('test', 'thread')
204+
205+
print(
206+
'pytest-parallel: {} {} ({}), {} {} per worker ({})'.format(
207+
workers, worker_noun, process_noun, tests_per_worker, test_noun, thread_noun,
208+
)
209+
)
210+
211+
if sequence_queue_size:
212+
print(
213+
'{} tests will run parallel, {} tests will run in sequence.'.format(
214+
parallel_queue_size, sequence_queue_size,
215+
)
216+
)
217+
218+
192219
class ParallelRunner(object):
193220
def __init__(self, config):
194221
self._config = config
@@ -257,22 +284,9 @@ def pytest_runtestloop(self, session):
257284
raise ValueError(('tests_per_worker can only be '
258285
'an integer or "auto"'))
259286

260-
if self.workers > 1:
261-
worker_noun, process_noun = ('workers', 'processes')
262-
else:
263-
worker_noun, process_noun = ('worker', 'process')
264-
265-
if tests_per_worker > 1:
266-
test_noun, thread_noun = ('tests', 'threads')
267-
else:
268-
test_noun, thread_noun = ('test', 'thread')
269-
270-
print('pytest-parallel: {} {} ({}), {} {} per worker ({})'
271-
.format(self.workers, worker_noun, process_noun,
272-
tests_per_worker, test_noun, thread_noun))
273-
274287
queue_cls = self._manager.Queue
275-
queue = queue_cls()
288+
parallel_queue = queue_cls()
289+
sequence_queue = queue_cls()
276290
errors = queue_cls()
277291

278292
# Reports about tests will be gathered from workerss
@@ -282,13 +296,20 @@ def pytest_runtestloop(self, session):
282296
# This way, report generators like JUnitXML will work as expected.
283297
self.responses_queue = queue_cls()
284298

285-
for i in range(len(session.items)):
286-
queue.put(i)
299+
for i, item in enumerate(session.items):
300+
if "sequence" in [mark.name for mark in item.own_markers]:
301+
sequence_queue.put(i)
302+
else:
303+
parallel_queue.put(i)
304+
305+
print_info(self.workers, tests_per_worker, parallel_queue.qsize(), sequence_queue.qsize())
287306

288307
# Now we need to put stopping sentinels, so that worker
289308
# processes will know, there is time to finish the work.
290309
for i in range(self.workers * tests_per_worker):
291-
queue.put('stop')
310+
parallel_queue.put('stop')
311+
312+
sequence_queue.put('stop')
292313

293314
responses_processor = threading.Thread(
294315
target=self.process_responses,
@@ -307,15 +328,21 @@ def wait_for_responses_processor():
307328
# This flag will be changed after the worker's fork.
308329
self._config.parallel_worker = False
309330

310-
args = (self._config, queue, session, tests_per_worker, errors)
331+
args = (self._config, parallel_queue, session, tests_per_worker, errors)
311332
for _ in range(self.workers):
312333
process = Process(target=process_with_threads, args=args)
313334
process.start()
314335
processes.append(process)
315336

316337
[p.join() for p in processes]
317338

318-
queue.join()
339+
parallel_queue.join()
340+
341+
thread_for_sequence = ThreadWorker(sequence_queue, session, errors)
342+
thread_for_sequence.start()
343+
thread_for_sequence.join()
344+
sequence_queue.join()
345+
319346
wait_for_responses_processor()
320347

321348
if not errors.empty():

0 commit comments

Comments
 (0)