aqmp - Asynchronous QMP

QEMU Monitor Protocol (QMP) development library & tooling.

This package provides a fairly low-level class for communicating asynchronously with QMP protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the QEMU Storage Daemon.

QMPClient provides the main functionality of this package. All errors raised by this library dervive from AQMPError, see aqmp.error for additional detail. See aqmp.events for an in-depth tutorial on managing QMP events.

class qemu.aqmp.QMPClient(name: Optional[str] = None)[source]

Bases: qemu.aqmp.protocol.AsyncProtocol[qemu.aqmp.message.Message], qemu.aqmp.events.Events

Implements a QMP client connection.

QMP can be used to establish a connection as either the transport client or server, though this class always acts as the QMP client.

Parameters

name – Optional nickname for the connection, used for logging.

Basic script-style usage looks like this:

qmp = QMPClient('my_virtual_machine_name')
await qmp.connect(('127.0.0.1', 1234))
...
res = await qmp.execute('block-query')
...
await qmp.disconnect()

Basic async client-style usage looks like this:

class Client:
    def __init__(self, name: str):
        self.qmp = QMPClient(name)

    async def watch_events(self):
        try:
            async for event in self.qmp.events:
                print(f"Event: {event['event']}")
        except asyncio.CancelledError:
            return

    async def run(self, address='/tmp/qemu.socket'):
        await self.qmp.connect(address)
        asyncio.create_task(self.watch_events())
        await self.qmp.runstate_changed.wait()
        await self.disconnect()

See aqmp.events for more detail on event handling patterns.

logger: logging.Logger = <Logger qemu.aqmp.qmp_client (WARNING)>

Logger object used for debugging messages.

await_greeting: bool

Whether or not to await a greeting after establishing a connection.

negotiate: bool

Whether or not to perform capabilities negotiation upon connection. Implies await_greeting.

execute_msg(msg: qemu.aqmp.message.Message)object[source]

Execute a QMP command and return its value.

Parameters

msg – The QMP Message to execute.

Returns

The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a dict.

Raises
classmethod make_execute_msg(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False)qemu.aqmp.message.Message[source]

Create an executable message to be sent by execute_msg later.

Parameters
  • cmd – QMP command name.

  • arguments – Arguments (if any). Must be JSON-serializable.

  • oob – If True, execute “out of band”.

Returns

An executable QMP Message.

async execute(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False)object[source]

Execute a QMP command and return its value.

Parameters
  • cmd – QMP command name.

  • arguments – Arguments (if any). Must be JSON-serializable.

  • oob – If True, execute “out of band”.

Returns

The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a dict.

Raises
accept(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None)None

Accept a connection and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters
  • address – Address to listen to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises
connect(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None)None

Connect to the server and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters
  • address – Address to connect to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises
async disconnect()None

Disconnect and wait for all tasks to fully stop.

If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.

Raises

Exception – When the reader or writer terminate unexpectedly.

listen(*listeners: qemu.aqmp.events.EventListener)Iterator[None]

Context manager: Temporarily listen with an EventListener.

Accepts one or more EventListener objects and registers them, activating them for the duration of the context block.

EventListener objects will have any pending events in their FIFO queue cleared upon exiting the context block, when they are deactivated.

Parameters

*listeners – One or more EventListeners to activate.

Raises

ListenerError – If the given listener(s) are already active.

listener(names: Optional[Union[str, Iterable[str]]] = (), event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]] = None)Iterator[qemu.aqmp.events.EventListener]

Context manager: Temporarily listen with a new EventListener.

Creates an EventListener object and registers it, activating it for the duration of the context block.

Parameters
  • names – One or more names of events to listen for. When not provided, listen for ALL events.

  • event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.

Returns

The newly created and active EventListener.

register_listener(listener: qemu.aqmp.events.EventListener)None

Register and activate an EventListener.

Parameters

listener – The listener to activate.

Raises

ListenerError – If the given listener is already registered.

remove_listener(listener: qemu.aqmp.events.EventListener)None

Unregister and deactivate an EventListener.

The removed listener will have its pending events cleared via clear(). The listener can be re-registered later when desired.

Parameters

listener – The listener to deactivate.

Raises

ListenerError – If the given listener is not registered.

property runstate: qemu.aqmp.protocol.Runstate

The current Runstate of the connection.

async runstate_changed()qemu.aqmp.protocol.Runstate

Wait for the runstate to change, then return that runstate.

name: Optional[str]

The nickname for this connection, if any.

events: EventListener

Default, all-events EventListener.

class qemu.aqmp.Message(value: Union[bytes, Mapping[str, object]] = b'{}', *, eager: bool = True)[source]

Bases: MutableMapping[str, object]

Represents a single QMP protocol message.

QMP uses JSON objects as its basic communicative unit; so this Python object is a MutableMapping. It may be instantiated from either another mapping (like a dict), or from raw bytes that still need to be deserialized.

Once instantiated, it may be treated like any other MutableMapping:

>>> msg = Message(b'{"hello": "world"}')
>>> assert msg['hello'] == 'world'
>>> msg['id'] = 'foobar'
>>> print(msg)
{
  "hello": "world",
  "id": "foobar"
}

It can be converted to bytes:

>>> msg = Message({"hello": "world"})
>>> print(bytes(msg))
b'{"hello":"world","id":"foobar"}'

Or back into a garden-variety dict:

>>> dict(msg)
{'hello': 'world'}
Parameters
  • value – Initial value, if any.

  • eager – When True, attempt to serialize or deserialize the initial value immediately, so that conversion exceptions are raised during the call to __init__().

class qemu.aqmp.EventListener(names: Optional[Union[str, Iterable[str]]] = None, event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]] = None)[source]

Bases: object

Selectively listens for events with runtime configurable filtering.

This class is designed to be directly usable for the most common cases, but it can be extended to provide more rigorous control.

Parameters
  • names – One or more names of events to listen for. When not provided, listen for ALL events.

  • event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.

When names and event_filter are both provided, the names will be filtered first, and then the filter function will be called second. The event filter function can assume that the format of the event is a known format.

names: Set[str]

Primary event filter, based on one or more event names.

event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]]

Optional, secondary event filter.

property history: Tuple[qemu.aqmp.message.Message, ...]

A read-only history of all events seen so far.

This represents every event, including those not yet witnessed via get() or async for. It persists between clear() calls and is immutable.

accept(event: qemu.aqmp.message.Message)bool[source]

Determine if this listener accepts this event.

This method determines which events will appear in the stream. The default implementation simply checks the event against the list of names and the event_filter to decide if this EventListener accepts a given event. It can be overridden/extended to provide custom listener behavior.

User code is not expected to need to invoke this method.

Parameters

event – The event under consideration.

Returns

True, if this listener accepts this event.

async put(event: qemu.aqmp.message.Message)None[source]

Conditionally put a new event into the FIFO queue.

This method is not designed to be invoked from user code, and it should not need to be overridden. It is a public interface so that QMPClient has an interface by which it can inform registered listeners of new events.

The event will be put into the queue if accept() returns True.

Parameters

event – The new event to put into the FIFO queue.

async get()qemu.aqmp.message.Message[source]

Wait for the very next event in this stream.

If one is already available, return that one.

clear()None[source]

Clear this listener of all pending events.

Called when an EventListener is being unregistered, this clears the pending FIFO queue synchronously. It can be also be used to manually clear any pending events, if desired.

Warning

Take care when discarding events. Cleared events will be silently tossed on the floor. All events that were ever accepted by this listener are visible in history().

class qemu.aqmp.Runstate(value)[source]

Bases: enum.Enum

Protocol session runstate.

IDLE = 0

Fully quiesced and disconnected.

CONNECTING = 1

In the process of connecting or establishing a session.

RUNNING = 2

Fully connected and active session.

DISCONNECTING = 3

In the process of disconnecting. Runstate may be returned to IDLE by calling disconnect().

exception qemu.aqmp.AQMPError[source]

Bases: Exception

Abstract error class for all errors originating from this package.

exception qemu.aqmp.StateError(error_message: str, state: qemu.aqmp.protocol.Runstate, required: qemu.aqmp.protocol.Runstate)[source]

Bases: qemu.aqmp.error.AQMPError

An API command (connect, execute, etc) was issued at an inappropriate time.

This error is raised when a command like connect() is issued at an inappropriate time.

Parameters
  • error_message – Human-readable string describing the state violation.

  • state – The actual Runstate seen at the time of the violation.

  • required – The Runstate required to process this command.

exception qemu.aqmp.ConnectError(error_message: str, exc: Exception)[source]

Bases: qemu.aqmp.error.AQMPError

Raised when the initial connection process has failed.

This Exception always wraps a “root cause” exception that can be interrogated for additional information.

Parameters
  • error_message – Human-readable string describing the error.

  • exc – The root-cause exception.

error_message: str

Human-readable error string

exc: Exception

Wrapped root cause exception

exception qemu.aqmp.ExecuteError(error_response: qemu.aqmp.models.ErrorResponse, sent: qemu.aqmp.message.Message, received: qemu.aqmp.message.Message)[source]

Bases: qemu.aqmp.error.AQMPError

Exception raised by QMPClient.execute() on RPC failure.

Parameters
  • error_response – The RPC error response object.

  • sent – The sent RPC message that caused the failure.

  • received – The raw RPC error reply received.

sent: qemu.aqmp.message.Message

The sent Message that caused the failure

received: qemu.aqmp.message.Message

The received Message that indicated failure

error: qemu.aqmp.models.ErrorResponse

The parsed error response

error_class: str

The QMP error class

exception qemu.aqmp.ExecInterruptedError[source]

Bases: qemu.aqmp.error.AQMPError

Exception raised by execute() (et al) when an RPC is interrupted.

This error is raised when an execute() statement could not be completed. This can occur because the connection itself was terminated before a reply was received.

The true cause of the interruption will be available via disconnect().