Sharing a dynamically started worker among several consumers
up vote
2
down vote
favorite
I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.
My requirements are as follow:
- The worker instance is created dynamically first time a consumer requires it.
- When other consumers then require it, they re-use the same worker instance.
- When the last consumer closes the stream, it cleans up its resources.
This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass
I have the following scenarios:
Scenario 1 - exception at startup
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Worker startup sequence raises an exception.
- Both consumer should see the exception as the result of their call to start().
Scenario 2 - partial asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Worker startup sequence completes.
- Consumer 2 should see a successful start.
Scenario 3 - complete asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Consumer 2 gets cancelled.
- Worker startup sequence must be cancelled as a result.
I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.
Here is an attempt at writing start()
. It relies on _worker()
setting an asyncio.Event
named self._worker_ready
when it completes the startup sequence:
async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())
# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())
done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1
# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()
try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'
That seems to work, but it seems too complex, and is really hard to test: the worker has many await
points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.
I need a better way. I am thus wondering:
- Are my requirements reasonable?
- Is there a common pattern to do this?
- Does my question raise some code smell?
python-3.x python-asyncio
add a comment |
up vote
2
down vote
favorite
I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.
My requirements are as follow:
- The worker instance is created dynamically first time a consumer requires it.
- When other consumers then require it, they re-use the same worker instance.
- When the last consumer closes the stream, it cleans up its resources.
This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass
I have the following scenarios:
Scenario 1 - exception at startup
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Worker startup sequence raises an exception.
- Both consumer should see the exception as the result of their call to start().
Scenario 2 - partial asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Worker startup sequence completes.
- Consumer 2 should see a successful start.
Scenario 3 - complete asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Consumer 2 gets cancelled.
- Worker startup sequence must be cancelled as a result.
I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.
Here is an attempt at writing start()
. It relies on _worker()
setting an asyncio.Event
named self._worker_ready
when it completes the startup sequence:
async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())
# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())
done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1
# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()
try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'
That seems to work, but it seems too complex, and is really hard to test: the worker has many await
points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.
I need a better way. I am thus wondering:
- Are my requirements reasonable?
- Is there a common pattern to do this?
- Does my question raise some code smell?
python-3.x python-asyncio
add a comment |
up vote
2
down vote
favorite
up vote
2
down vote
favorite
I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.
My requirements are as follow:
- The worker instance is created dynamically first time a consumer requires it.
- When other consumers then require it, they re-use the same worker instance.
- When the last consumer closes the stream, it cleans up its resources.
This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass
I have the following scenarios:
Scenario 1 - exception at startup
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Worker startup sequence raises an exception.
- Both consumer should see the exception as the result of their call to start().
Scenario 2 - partial asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Worker startup sequence completes.
- Consumer 2 should see a successful start.
Scenario 3 - complete asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Consumer 2 gets cancelled.
- Worker startup sequence must be cancelled as a result.
I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.
Here is an attempt at writing start()
. It relies on _worker()
setting an asyncio.Event
named self._worker_ready
when it completes the startup sequence:
async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())
# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())
done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1
# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()
try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'
That seems to work, but it seems too complex, and is really hard to test: the worker has many await
points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.
I need a better way. I am thus wondering:
- Are my requirements reasonable?
- Is there a common pattern to do this?
- Does my question raise some code smell?
python-3.x python-asyncio
I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.
My requirements are as follow:
- The worker instance is created dynamically first time a consumer requires it.
- When other consumers then require it, they re-use the same worker instance.
- When the last consumer closes the stream, it cleans up its resources.
This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass
I have the following scenarios:
Scenario 1 - exception at startup
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Worker startup sequence raises an exception.
- Both consumer should see the exception as the result of their call to start().
Scenario 2 - partial asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Worker startup sequence completes.
- Consumer 2 should see a successful start.
Scenario 3 - complete asynchronous cancellation
- Consumer 1 requests worker to start.
- Worker startup sequence begins
- Consumer 2 requests worker to start.
- Consumer 1 gets cancelled.
- Consumer 2 gets cancelled.
- Worker startup sequence must be cancelled as a result.
I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.
Here is an attempt at writing start()
. It relies on _worker()
setting an asyncio.Event
named self._worker_ready
when it completes the startup sequence:
async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())
# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())
done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1
# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()
try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'
That seems to work, but it seems too complex, and is really hard to test: the worker has many await
points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.
I need a better way. I am thus wondering:
- Are my requirements reasonable?
- Is there a common pattern to do this?
- Does my question raise some code smell?
python-3.x python-asyncio
python-3.x python-asyncio
edited Nov 8 at 15:26
asked Nov 8 at 15:17
spectras
7,08511434
7,08511434
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
2
down vote
accepted
Your requirements sound reasonable. I would try to simplify start
by replacing Event
with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
loop = asyncio.get_event_loop()
if self._worker_startup_task is None:
self._worker_startup_task =
loop.create_task(self._worker_startup())
self._add_user()
try:
await asyncio.shield(asyncio.wait_for(
self._worker_startup_task, timeout))
except:
self._rm_user()
raise
async def _worker_startup(self):
loop = asyncio.get_event_loop()
await asyncio.sleep(1) # ...
self._worker_task = loop.create_task(self._worker())
In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event
, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.
Thus in case of consumer cancellation, await self._worker_startup_task
will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.
Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task
will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel()
will cancel the startup sequence, as required by scenario 3.
The rest of the code would look like this (untested):
def __init__(self):
self._users = 0
self._worker_startup = None
def _add_user(self):
self._users += 1
def _rm_user(self):
self._users -= 1
if self._users:
return
self._worker_startup_task.cancel()
self._worker_startup_task = None
if self._worker_task is not None:
self._worker_task.cancel()
self._worker_task = None
async def stop(self):
self._rm_user()
async def _worker(self):
# actual worker...
while True:
await asyncio.sleep(1)
Hmmm, cancelled coroutines need to be awaited as well, and thewait_for
shouldasyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
– spectras
Nov 11 at 16:44
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
add a comment |
up vote
1
down vote
With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:
st = SharedTask(test())
task1 = asyncio.ensure_future(st.wait())
task2 = asyncio.ensure_future(st.wait(timeout=15))
task3 = asyncio.ensure_future(st.wait())
This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test()
unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test()
and wait for cancellation handling to complete.
If passed a coroutine, it is only scheduled when first task starts waiting.
Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).
import asyncio
from contextlib import suppress
class SharedTask:
__slots__ = ('_clients', '_task')
def __init__(self, task):
if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
raise TypeError('task must be either a Future or a coroutine object')
self._clients = 0
self._task = task
@property
def started(self):
return asyncio.isfuture(self._task)
async def wait(self, *, timeout=None):
self._task = asyncio.ensure_future(self._task)
self._clients += 1
try:
return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
except:
self._clients -= 1
if self._clients == 0 and not self._task.done():
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
raise
def cancel(self):
if asyncio.iscoroutine(self._task):
self._task.close()
elif asyncio.isfuture(self._task):
self._task.cancel()
The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:
async def my_task():
try:
await do_stuff()
except asyncio.CancelledError as exc:
await flush_some_stuff() # might raise an exception
raise exc
The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task
is wrapped in a SharedTask
or not.
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
– user4815162342
Nov 12 at 10:04
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
– spectras
Nov 12 at 11:12
I get that; the part that worries me is that if aCancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressedCancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
– user4815162342
Nov 12 at 11:56
1
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped inSharedTask
). Should one want another behavior, usesuppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
– spectras
Nov 12 at 12:32
1
@user4815162342 yes. Well there is a test forself._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
– spectras
Nov 12 at 13:08
|
show 3 more comments
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
accepted
Your requirements sound reasonable. I would try to simplify start
by replacing Event
with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
loop = asyncio.get_event_loop()
if self._worker_startup_task is None:
self._worker_startup_task =
loop.create_task(self._worker_startup())
self._add_user()
try:
await asyncio.shield(asyncio.wait_for(
self._worker_startup_task, timeout))
except:
self._rm_user()
raise
async def _worker_startup(self):
loop = asyncio.get_event_loop()
await asyncio.sleep(1) # ...
self._worker_task = loop.create_task(self._worker())
In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event
, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.
Thus in case of consumer cancellation, await self._worker_startup_task
will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.
Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task
will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel()
will cancel the startup sequence, as required by scenario 3.
The rest of the code would look like this (untested):
def __init__(self):
self._users = 0
self._worker_startup = None
def _add_user(self):
self._users += 1
def _rm_user(self):
self._users -= 1
if self._users:
return
self._worker_startup_task.cancel()
self._worker_startup_task = None
if self._worker_task is not None:
self._worker_task.cancel()
self._worker_task = None
async def stop(self):
self._rm_user()
async def _worker(self):
# actual worker...
while True:
await asyncio.sleep(1)
Hmmm, cancelled coroutines need to be awaited as well, and thewait_for
shouldasyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
– spectras
Nov 11 at 16:44
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
add a comment |
up vote
2
down vote
accepted
Your requirements sound reasonable. I would try to simplify start
by replacing Event
with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
loop = asyncio.get_event_loop()
if self._worker_startup_task is None:
self._worker_startup_task =
loop.create_task(self._worker_startup())
self._add_user()
try:
await asyncio.shield(asyncio.wait_for(
self._worker_startup_task, timeout))
except:
self._rm_user()
raise
async def _worker_startup(self):
loop = asyncio.get_event_loop()
await asyncio.sleep(1) # ...
self._worker_task = loop.create_task(self._worker())
In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event
, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.
Thus in case of consumer cancellation, await self._worker_startup_task
will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.
Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task
will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel()
will cancel the startup sequence, as required by scenario 3.
The rest of the code would look like this (untested):
def __init__(self):
self._users = 0
self._worker_startup = None
def _add_user(self):
self._users += 1
def _rm_user(self):
self._users -= 1
if self._users:
return
self._worker_startup_task.cancel()
self._worker_startup_task = None
if self._worker_task is not None:
self._worker_task.cancel()
self._worker_task = None
async def stop(self):
self._rm_user()
async def _worker(self):
# actual worker...
while True:
await asyncio.sleep(1)
Hmmm, cancelled coroutines need to be awaited as well, and thewait_for
shouldasyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
– spectras
Nov 11 at 16:44
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
add a comment |
up vote
2
down vote
accepted
up vote
2
down vote
accepted
Your requirements sound reasonable. I would try to simplify start
by replacing Event
with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
loop = asyncio.get_event_loop()
if self._worker_startup_task is None:
self._worker_startup_task =
loop.create_task(self._worker_startup())
self._add_user()
try:
await asyncio.shield(asyncio.wait_for(
self._worker_startup_task, timeout))
except:
self._rm_user()
raise
async def _worker_startup(self):
loop = asyncio.get_event_loop()
await asyncio.sleep(1) # ...
self._worker_task = loop.create_task(self._worker())
In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event
, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.
Thus in case of consumer cancellation, await self._worker_startup_task
will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.
Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task
will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel()
will cancel the startup sequence, as required by scenario 3.
The rest of the code would look like this (untested):
def __init__(self):
self._users = 0
self._worker_startup = None
def _add_user(self):
self._users += 1
def _rm_user(self):
self._users -= 1
if self._users:
return
self._worker_startup_task.cancel()
self._worker_startup_task = None
if self._worker_task is not None:
self._worker_task.cancel()
self._worker_task = None
async def stop(self):
self._rm_user()
async def _worker(self):
# actual worker...
while True:
await asyncio.sleep(1)
Your requirements sound reasonable. I would try to simplify start
by replacing Event
with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:
class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
loop = asyncio.get_event_loop()
if self._worker_startup_task is None:
self._worker_startup_task =
loop.create_task(self._worker_startup())
self._add_user()
try:
await asyncio.shield(asyncio.wait_for(
self._worker_startup_task, timeout))
except:
self._rm_user()
raise
async def _worker_startup(self):
loop = asyncio.get_event_loop()
await asyncio.sleep(1) # ...
self._worker_task = loop.create_task(self._worker())
In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event
, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.
Thus in case of consumer cancellation, await self._worker_startup_task
will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.
Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task
will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel()
will cancel the startup sequence, as required by scenario 3.
The rest of the code would look like this (untested):
def __init__(self):
self._users = 0
self._worker_startup = None
def _add_user(self):
self._users += 1
def _rm_user(self):
self._users -= 1
if self._users:
return
self._worker_startup_task.cancel()
self._worker_startup_task = None
if self._worker_task is not None:
self._worker_task.cancel()
self._worker_task = None
async def stop(self):
self._rm_user()
async def _worker(self):
# actual worker...
while True:
await asyncio.sleep(1)
edited Nov 11 at 18:26
answered Nov 10 at 20:22
user4815162342
58.8k488138
58.8k488138
Hmmm, cancelled coroutines need to be awaited as well, and thewait_for
shouldasyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
– spectras
Nov 11 at 16:44
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
add a comment |
Hmmm, cancelled coroutines need to be awaited as well, and thewait_for
shouldasyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
– spectras
Nov 11 at 16:44
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
Hmmm, cancelled coroutines need to be awaited as well, and the
wait_for
should asyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.– spectras
Nov 11 at 16:44
Hmmm, cancelled coroutines need to be awaited as well, and the
wait_for
should asyncio.shield
the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.– spectras
Nov 11 at 16:44
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
@spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
– user4815162342
Nov 11 at 18:27
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
– spectras
Nov 11 at 22:54
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
– spectras
Nov 11 at 23:00
add a comment |
up vote
1
down vote
With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:
st = SharedTask(test())
task1 = asyncio.ensure_future(st.wait())
task2 = asyncio.ensure_future(st.wait(timeout=15))
task3 = asyncio.ensure_future(st.wait())
This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test()
unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test()
and wait for cancellation handling to complete.
If passed a coroutine, it is only scheduled when first task starts waiting.
Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).
import asyncio
from contextlib import suppress
class SharedTask:
__slots__ = ('_clients', '_task')
def __init__(self, task):
if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
raise TypeError('task must be either a Future or a coroutine object')
self._clients = 0
self._task = task
@property
def started(self):
return asyncio.isfuture(self._task)
async def wait(self, *, timeout=None):
self._task = asyncio.ensure_future(self._task)
self._clients += 1
try:
return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
except:
self._clients -= 1
if self._clients == 0 and not self._task.done():
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
raise
def cancel(self):
if asyncio.iscoroutine(self._task):
self._task.close()
elif asyncio.isfuture(self._task):
self._task.cancel()
The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:
async def my_task():
try:
await do_stuff()
except asyncio.CancelledError as exc:
await flush_some_stuff() # might raise an exception
raise exc
The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task
is wrapped in a SharedTask
or not.
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
– user4815162342
Nov 12 at 10:04
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
– spectras
Nov 12 at 11:12
I get that; the part that worries me is that if aCancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressedCancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
– user4815162342
Nov 12 at 11:56
1
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped inSharedTask
). Should one want another behavior, usesuppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
– spectras
Nov 12 at 12:32
1
@user4815162342 yes. Well there is a test forself._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
– spectras
Nov 12 at 13:08
|
show 3 more comments
up vote
1
down vote
With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:
st = SharedTask(test())
task1 = asyncio.ensure_future(st.wait())
task2 = asyncio.ensure_future(st.wait(timeout=15))
task3 = asyncio.ensure_future(st.wait())
This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test()
unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test()
and wait for cancellation handling to complete.
If passed a coroutine, it is only scheduled when first task starts waiting.
Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).
import asyncio
from contextlib import suppress
class SharedTask:
__slots__ = ('_clients', '_task')
def __init__(self, task):
if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
raise TypeError('task must be either a Future or a coroutine object')
self._clients = 0
self._task = task
@property
def started(self):
return asyncio.isfuture(self._task)
async def wait(self, *, timeout=None):
self._task = asyncio.ensure_future(self._task)
self._clients += 1
try:
return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
except:
self._clients -= 1
if self._clients == 0 and not self._task.done():
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
raise
def cancel(self):
if asyncio.iscoroutine(self._task):
self._task.close()
elif asyncio.isfuture(self._task):
self._task.cancel()
The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:
async def my_task():
try:
await do_stuff()
except asyncio.CancelledError as exc:
await flush_some_stuff() # might raise an exception
raise exc
The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task
is wrapped in a SharedTask
or not.
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
– user4815162342
Nov 12 at 10:04
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
– spectras
Nov 12 at 11:12
I get that; the part that worries me is that if aCancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressedCancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
– user4815162342
Nov 12 at 11:56
1
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped inSharedTask
). Should one want another behavior, usesuppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
– spectras
Nov 12 at 12:32
1
@user4815162342 yes. Well there is a test forself._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
– spectras
Nov 12 at 13:08
|
show 3 more comments
up vote
1
down vote
up vote
1
down vote
With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:
st = SharedTask(test())
task1 = asyncio.ensure_future(st.wait())
task2 = asyncio.ensure_future(st.wait(timeout=15))
task3 = asyncio.ensure_future(st.wait())
This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test()
unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test()
and wait for cancellation handling to complete.
If passed a coroutine, it is only scheduled when first task starts waiting.
Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).
import asyncio
from contextlib import suppress
class SharedTask:
__slots__ = ('_clients', '_task')
def __init__(self, task):
if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
raise TypeError('task must be either a Future or a coroutine object')
self._clients = 0
self._task = task
@property
def started(self):
return asyncio.isfuture(self._task)
async def wait(self, *, timeout=None):
self._task = asyncio.ensure_future(self._task)
self._clients += 1
try:
return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
except:
self._clients -= 1
if self._clients == 0 and not self._task.done():
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
raise
def cancel(self):
if asyncio.iscoroutine(self._task):
self._task.close()
elif asyncio.isfuture(self._task):
self._task.cancel()
The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:
async def my_task():
try:
await do_stuff()
except asyncio.CancelledError as exc:
await flush_some_stuff() # might raise an exception
raise exc
The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task
is wrapped in a SharedTask
or not.
With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:
st = SharedTask(test())
task1 = asyncio.ensure_future(st.wait())
task2 = asyncio.ensure_future(st.wait(timeout=15))
task3 = asyncio.ensure_future(st.wait())
This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test()
unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test()
and wait for cancellation handling to complete.
If passed a coroutine, it is only scheduled when first task starts waiting.
Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).
import asyncio
from contextlib import suppress
class SharedTask:
__slots__ = ('_clients', '_task')
def __init__(self, task):
if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
raise TypeError('task must be either a Future or a coroutine object')
self._clients = 0
self._task = task
@property
def started(self):
return asyncio.isfuture(self._task)
async def wait(self, *, timeout=None):
self._task = asyncio.ensure_future(self._task)
self._clients += 1
try:
return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
except:
self._clients -= 1
if self._clients == 0 and not self._task.done():
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
raise
def cancel(self):
if asyncio.iscoroutine(self._task):
self._task.close()
elif asyncio.isfuture(self._task):
self._task.cancel()
The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:
async def my_task():
try:
await do_stuff()
except asyncio.CancelledError as exc:
await flush_some_stuff() # might raise an exception
raise exc
The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task
is wrapped in a SharedTask
or not.
edited Nov 12 at 12:41
answered Nov 11 at 22:59
spectras
7,08511434
7,08511434
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
– user4815162342
Nov 12 at 10:04
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
– spectras
Nov 12 at 11:12
I get that; the part that worries me is that if aCancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressedCancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
– user4815162342
Nov 12 at 11:56
1
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped inSharedTask
). Should one want another behavior, usesuppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
– spectras
Nov 12 at 12:32
1
@user4815162342 yes. Well there is a test forself._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
– spectras
Nov 12 at 13:08
|
show 3 more comments
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
– user4815162342
Nov 12 at 10:04
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
– spectras
Nov 12 at 11:12
I get that; the part that worries me is that if aCancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressedCancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
– user4815162342
Nov 12 at 11:56
1
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped inSharedTask
). Should one want another behavior, usesuppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
– spectras
Nov 12 at 12:32
1
@user4815162342 yes. Well there is a test forself._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
– spectras
Nov 12 at 13:08
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)– user4815162342
Nov 12 at 10:04
suppress(asyncio.CancelledError)
looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)– user4815162342
Nov 12 at 10:04
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.
suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.– spectras
Nov 12 at 11:12
@user4815162342 It is counter-intuitive, but it doesn't make it uncancellable.
suppress
does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.– spectras
Nov 12 at 11:12
I get that; the part that worries me is that if a
CancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.– user4815162342
Nov 12 at 11:56
I get that; the part that worries me is that if a
CancelledError
doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError
, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.– user4815162342
Nov 12 at 11:56
1
1
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in
SharedTask
). Should one want another behavior, use suppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).– spectras
Nov 12 at 12:32
This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in
SharedTask
). Should one want another behavior, use suppress(Exception)
instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).– spectras
Nov 12 at 12:32
1
1
@user4815162342 yes. Well there is a test for
self._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.– spectras
Nov 12 at 13:08
@user4815162342 yes. Well there is a test for
self._task.done()
that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.– spectras
Nov 12 at 13:08
|
show 3 more comments
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53210696%2fsharing-a-dynamically-started-worker-among-several-consumers%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown