qemu.aqmp.events module

AQMP Events and EventListeners

Asynchronous QMP uses EventListener objects to listen for events. An EventListener is a FIFO event queue that can be pre-filtered to listen for only specific events. Each EventListener instance receives its own copy of events that it hears, so events may be consumed without fear or worry for depriving other listeners of events they need to hear.

EventListener Tutorial

In all of the following examples, we assume that we have a QMPClient instantiated named qmp that is already connected.

listener() context blocks with one name

The most basic usage is by using the listener() context manager to construct them:

with qmp.listener('STOP') as listener:
    await qmp.execute('stop')
    await listener.get()

The listener is active only for the duration of the ‘with’ block. This instance listens only for ‘STOP’ events.

listener() context blocks with two or more names

Multiple events can be selected for by providing any Iterable[str]:

with qmp.listener(('STOP', 'RESUME')) as listener:
    await qmp.execute('stop')
    event = await listener.get()
    assert event['event'] == 'STOP'

    await qmp.execute('cont')
    event = await listener.get()
    assert event['event'] == 'RESUME'

listener() context blocks with no names

By omitting names entirely, you can listen to ALL events.

with qmp.listener() as listener:
    await qmp.execute('stop')
    event = await listener.get()
    assert event['event'] == 'STOP'

This isn’t a very good use case for this feature: In a non-trivial running system, we may not know what event will arrive next. Grabbing the top of a FIFO queue returning multiple kinds of events may be prone to error.

Using async iterators to retrieve events

If you’d like to simply watch what events happen to arrive, you can use the listener as an async iterator:

with qmp.listener() as listener:
    async for event in listener:
        print(f"Event arrived: {event['event']}")

This is analogous to the following code:

with qmp.listener() as listener:
    while True:
        event = listener.get()
        print(f"Event arrived: {event['event']}")

This event stream will never end, so these blocks will never terminate.

Using asyncio.Task to concurrently retrieve events

Since a listener’s event stream will never terminate, it is not likely useful to use that form in a script. For longer-running clients, we can create event handlers by using asyncio.Task to create concurrent coroutines:

async def print_events(listener):
    try:
        async for event in listener:
            print(f"Event arrived: {event['event']}")
    except asyncio.CancelledError:
        return

with qmp.listener() as listener:
    task = asyncio.Task(print_events(listener))
    await qmp.execute('stop')
    await qmp.execute('cont')
    task.cancel()
    await task

However, there is no guarantee that these events will be received by the time we leave this context block. Once the context block is exited, the listener will cease to hear any new events, and becomes inert.

Be mindful of the timing: the above example will probably– but does not guarantee– that both STOP/RESUMED events will be printed. The example below outlines how to use listeners outside of a context block.

Using register_listener() and remove_listener()

To create a listener with a longer lifetime, beyond the scope of a single block, create a listener and then call register_listener():

class MyClient:
    def __init__(self, qmp):
        self.qmp = qmp
        self.listener = EventListener()

    async def print_events(self):
        try:
            async for event in self.listener:
                print(f"Event arrived: {event['event']}")
        except asyncio.CancelledError:
            return

    async def run(self):
        self.task = asyncio.Task(self.print_events)
        self.qmp.register_listener(self.listener)
        await qmp.execute('stop')
        await qmp.execute('cont')

    async def stop(self):
        self.task.cancel()
        await self.task
        self.qmp.remove_listener(self.listener)

The listener can be deactivated by using remove_listener(). When it is removed, any possible pending events are cleared and it can be re-registered at a later time.

Using the built-in all events listener

The QMPClient object creates its own default listener named events that can be used for the same purpose without having to create your own:

async def print_events(listener):
    try:
        async for event in listener:
            print(f"Event arrived: {event['event']}")
    except asyncio.CancelledError:
        return

task = asyncio.Task(print_events(qmp.events))

await qmp.execute('stop')
await qmp.execute('cont')

task.cancel()
await task

Using both .get() and async iterators

The async iterator and get() methods pull events from the same FIFO queue. If you mix the usage of both, be aware: Events are emitted precisely once per listener.

If multiple contexts try to pull events from the same listener instance, events are still emitted only precisely once.

This restriction can be lifted by creating additional listeners.

Creating multiple listeners

Additional EventListener objects can be created at-will. Each one receives its own copy of events, with separate FIFO event queues.

my_listener = EventListener()
qmp.register_listener(my_listener)

await qmp.execute('stop')
copy1 = await my_listener.get()
copy2 = await qmp.events.get()

assert copy1 == copy2

In this example, we await an event from both a user-created EventListener and the built-in events listener. Both receive the same event.

Clearing listeners

EventListener objects can be cleared, clearing all events seen thus far:

await qmp.execute('stop')
qmp.events.clear()
await qmp.execute('cont')
event = await qmp.events.get()
assert event['event'] == 'RESUME'

EventListener objects are FIFO queues. If events are not consumed, they will remain in the queue until they are witnessed or discarded via clear(). FIFO queues will be drained automatically upon leaving a context block, or when calling remove_listener().

Accessing listener history

EventListener objects record their history. Even after being cleared, you can obtain a record of all events seen so far:

await qmp.execute('stop')
await qmp.execute('cont')
qmp.events.clear()

assert len(qmp.events.history) == 2
assert qmp.events.history[0]['event'] == 'STOP'
assert qmp.events.history[1]['event'] == 'RESUME'

The history is updated immediately and does not require the event to be witnessed first.

Using event filters

EventListener objects can be given complex filtering criteria if names are not sufficient:

def job1_filter(event) -> bool:
    event_data = event.get('data', {})
    event_job_id = event_data.get('id')
    return event_job_id == "job1"

with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
    await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
    async for event in listener:
        if event['data']['status'] == 'concluded':
            break

These filters might be most useful when parameterized. EventListener objects expect a function that takes only a single argument (the raw event, as a Message) and returns a bool; True if the event should be accepted into the stream. You can create a function that adapts this signature to accept configuration parameters:

def job_filter(job_id: str) -> EventFilter:
    def filter(event: Message) -> bool:
        return event['data']['id'] == job_id
    return filter

with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
    await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
    async for event in listener:
        if event['data']['status'] == 'concluded':
            break

Activating an existing listener with listen()

Listeners with complex, long configurations can also be created manually and activated temporarily by using listen() instead of listener():

listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
                          'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
                          'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))

with qmp.listen(listener):
    await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
    async for event in listener:
        print(event)
        if event['event'] == 'BLOCK_JOB_COMPLETED':
            break

Any events that are not witnessed by the time the block is left will be cleared from the queue; entering the block is an implicit register_listener() and leaving the block is an implicit remove_listener().

Activating multiple existing listeners with listen()

While listener() is only capable of creating a single listener, listen() is capable of activating multiple listeners simultaneously:

def job_filter(job_id: str) -> EventFilter:
    def filter(event: Message) -> bool:
        return event['data']['id'] == job_id
    return filter

jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))

with qmp.listen(jobA, jobB):
    qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
    qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})

    async for event in jobA.get():
        if event['data']['status'] == 'concluded':
            break
    async for event in jobB.get():
        if event['data']['status'] == 'concluded':
            break

Extending the EventListener class

In the case that a more specialized EventListener is desired to provide either more functionality or more compact syntax for specialized cases, it can be extended.

One of the key methods to extend or override is accept(). The default implementation checks an incoming message for:

  1. A qualifying name, if any names were specified at initialization time

  2. That event_filter() returns True.

This can be modified however you see fit to change the criteria for inclusion in the stream.

For convenience, a JobListener class could be created that simply bakes in configuration so it does not need to be repeated:

class JobListener(EventListener):
    def __init__(self, job_id: str):
        super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
                          'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
                          'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
        self.job_id = job_id

    def accept(self, event) -> bool:
        if not super().accept(event):
            return False
        if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
            return event['data']['id'] == job_id
        return event['data']['device'] == job_id

From here on out, you can conjure up a custom-purpose listener that listens only for job-related events for a specific job-id easily:

listener = JobListener('job4')
with qmp.listener(listener):
    await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
    async for event in listener:
        print(event)
        if event['event'] == 'BLOCK_JOB_COMPLETED':
            break

Experimental Interfaces & Design Issues

These interfaces are not ones I am sure I will keep or otherwise modify heavily.

qmp.listener()’s type signature

listener() does not return anything, because it was assumed the caller already had a handle to the listener. However, for qmp.listener(EventListener()) forms, the caller will not have saved a handle to the listener.

Because this function can accept many listeners, I found it hard to accurately type in a way where it could be used in both “one” or “many” forms conveniently and in a statically type-safe manner.

Ultimately, I removed the return altogether, but perhaps with more time I can work out a way to re-add it.

API Reference

class qemu.aqmp.events.EventListener(names: Optional[Union[str, Iterable[str]]] = None, event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]] = None)[source]

Bases: object

Selectively listens for events with runtime configurable filtering.

This class is designed to be directly usable for the most common cases, but it can be extended to provide more rigorous control.

Parameters
  • names – One or more names of events to listen for. When not provided, listen for ALL events.

  • event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.

When names and event_filter are both provided, the names will be filtered first, and then the filter function will be called second. The event filter function can assume that the format of the event is a known format.

accept(event: qemu.aqmp.message.Message)bool[source]

Determine if this listener accepts this event.

This method determines which events will appear in the stream. The default implementation simply checks the event against the list of names and the event_filter to decide if this EventListener accepts a given event. It can be overridden/extended to provide custom listener behavior.

User code is not expected to need to invoke this method.

Parameters

event – The event under consideration.

Returns

True, if this listener accepts this event.

clear()None[source]

Clear this listener of all pending events.

Called when an EventListener is being unregistered, this clears the pending FIFO queue synchronously. It can be also be used to manually clear any pending events, if desired.

Warning

Take care when discarding events. Cleared events will be silently tossed on the floor. All events that were ever accepted by this listener are visible in history().

event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]]

Optional, secondary event filter.

async get()qemu.aqmp.message.Message[source]

Wait for the very next event in this stream.

If one is already available, return that one.

property history: Tuple[qemu.aqmp.message.Message, ...]

A read-only history of all events seen so far.

This represents every event, including those not yet witnessed via get() or async for. It persists between clear() calls and is immutable.

names: Set[str]

Primary event filter, based on one or more event names.

async put(event: qemu.aqmp.message.Message)None[source]

Conditionally put a new event into the FIFO queue.

This method is not designed to be invoked from user code, and it should not need to be overridden. It is a public interface so that QMPClient has an interface by which it can inform registered listeners of new events.

The event will be put into the queue if accept() returns True.

Parameters

event – The new event to put into the FIFO queue.

class qemu.aqmp.events.Events[source]

Bases: object

Events is a mix-in class that adds event functionality to the QMP class.

It’s designed specifically as a mix-in for QMPClient, and it relies upon the class it is being mixed into having a ‘logger’ property.

events: qemu.aqmp.events.EventListener

Default, all-events EventListener.

listen(*listeners: qemu.aqmp.events.EventListener)Iterator[None][source]

Context manager: Temporarily listen with an EventListener.

Accepts one or more EventListener objects and registers them, activating them for the duration of the context block.

EventListener objects will have any pending events in their FIFO queue cleared upon exiting the context block, when they are deactivated.

Parameters

*listeners – One or more EventListeners to activate.

Raises

ListenerError – If the given listener(s) are already active.

listener(names: Optional[Union[str, Iterable[str]]] = (), event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]] = None)Iterator[qemu.aqmp.events.EventListener][source]

Context manager: Temporarily listen with a new EventListener.

Creates an EventListener object and registers it, activating it for the duration of the context block.

Parameters
  • names – One or more names of events to listen for. When not provided, listen for ALL events.

  • event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.

Returns

The newly created and active EventListener.

register_listener(listener: qemu.aqmp.events.EventListener)None[source]

Register and activate an EventListener.

Parameters

listener – The listener to activate.

Raises

ListenerError – If the given listener is already registered.

remove_listener(listener: qemu.aqmp.events.EventListener)None[source]

Unregister and deactivate an EventListener.

The removed listener will have its pending events cleared via clear(). The listener can be re-registered later when desired.

Parameters

listener – The listener to deactivate.

Raises

ListenerError – If the given listener is not registered.

exception qemu.aqmp.events.ListenerError[source]

Bases: qemu.aqmp.error.AQMPError

Generic error class for EventListener-related problems.