_____________________________________ test _____________________________________
@pytest.mark.timeout(10)
def test():
ppe = concurrent.futures.ProcessPoolExecutor(1)
ppe.submit(int).result()
ppe.submit(int).cancel()
> ppe.shutdown(wait=True)
test_reproduce_python_bug.py:14:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/concurrent/futures/process.py:686: in shutdown
self._queue_management_thread.join()
/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py:1011: in join
self._wait_for_tstate_lock()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Thread(QueueManagerThread, started daemon 140003176535808)>
block = True, timeout = -1
def _wait_for_tstate_lock(self, block=True, timeout=-1):
# Issue #18808: wait for the thread state to be gone.
# At the end of the thread's life, after all knowledge of the thread
# is removed from C data structures, C code releases our _tstate_lock.
# This method passes its arguments to _tstate_lock.acquire().
# If the lock is acquired, the C code is done, and self._stop() is
# called. That sets ._is_stopped to True, and ._tstate_lock to None.
lock = self._tstate_lock
if lock is None: # already determined that the C code is done
assert self._is_stopped
> elif lock.acquire(block, timeout):
E Failed: Timeout >10.0s
/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py:1027: Failed
----------------------------- Captured stderr call -----------------------------
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
~~~~~~~~~~~~~~~~~ Stack of QueueFeederThread (140003159754496) ~~~~~~~~~~~~~~~~~
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/multiprocessing/queues.py", line 227, in _feed
nwait()
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 302, in wait
waiter.acquire()
~~~~~~~~~~~~~~~~ Stack of QueueManagerThread (140003176535808) ~~~~~~~~~~~~~~~~~
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/concurrent/futures/process.py", line 362, in _queue_management_worker
ready = mp.connection.wait(readers + worker_sentinels)
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/multiprocessing/connection.py", line 931, in wait
ready = selector.select(timeout)
File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/selectors.py", line 415, in select
fd_event_list = self._selector.poll(timeout)
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
Bug report
With a ProcessPoolExecutor, after submitting and quickly canceling a future, a call to
shutdown(wait=True)would hang indefinitely.This happens pretty much on all platforms and all recent Python versions.
Here is a minimal reproduction:
The first submission gets the executor going and creates its internal
queue_management_thread.The second submission appears to get that thread to loop, enter a wait state, and never receive a wakeup event.
Introducing a tiny sleep between the second submit and its cancel request makes the issue disappear. From my initial observation it looks like something in the way the
queue_management_workerinternal loop is structured doesn't handle this edge case well.Shutting down with
wait=Falsewould return immediately as expected, but thequeue_management_threadwould then die with an unhandledOSError: handle is closedexception.Environment
Additional info
When tested with
pytest-timeoutunder Ubuntu and cpython 3.8.13, these are the tracebacks at the moment of timing out:Details
_____________________________________ test _____________________________________ @pytest.mark.timeout(10) def test(): ppe = concurrent.futures.ProcessPoolExecutor(1) ppe.submit(int).result() ppe.submit(int).cancel() > ppe.shutdown(wait=True) test_reproduce_python_bug.py:14: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/concurrent/futures/process.py:686: in shutdown self._queue_management_thread.join() /opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py:1011: in join self._wait_for_tstate_lock() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <Thread(QueueManagerThread, started daemon 140003176535808)> block = True, timeout = -1 def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. # At the end of the thread's life, after all knowledge of the thread # is removed from C data structures, C code releases our _tstate_lock. # This method passes its arguments to _tstate_lock.acquire(). # If the lock is acquired, the C code is done, and self._stop() is # called. That sets ._is_stopped to True, and ._tstate_lock to None. lock = self._tstate_lock if lock is None: # already determined that the C code is done assert self._is_stopped > elif lock.acquire(block, timeout): E Failed: Timeout >10.0s /opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py:1027: Failed ----------------------------- Captured stderr call ----------------------------- +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++ ~~~~~~~~~~~~~~~~~ Stack of QueueFeederThread (140003159754496) ~~~~~~~~~~~~~~~~~ File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/multiprocessing/queues.py", line 227, in _feed nwait() File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 302, in wait waiter.acquire() ~~~~~~~~~~~~~~~~ Stack of QueueManagerThread (140003176535808) ~~~~~~~~~~~~~~~~~ File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/concurrent/futures/process.py", line 362, in _queue_management_worker ready = mp.connection.wait(readers + worker_sentinels) File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/multiprocessing/connection.py", line 931, in wait ready = selector.select(timeout) File "/opt/hostedtoolcache/Python/3.8.13/x64/lib/python3.8/selectors.py", line 415, in select fd_event_list = self._selector.poll(timeout) +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++Tracebacks in PyPy are similar on the
concurrent.futures.processlevel. Tracebacks in Windows are different in the lower-level areas, but again similar on theconcurrent.futures.processlevel.Linked PRs:
Linked PRs