aqmp - Asynchronous QMP¶
QEMU Monitor Protocol (QMP) development library & tooling.
This package provides a fairly low-level class for communicating asynchronously with QMP protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the QEMU Storage Daemon.
QMPClient
provides the main functionality of this package. All errors
raised by this library dervive from AQMPError
, see aqmp.error
for
additional detail. See aqmp.events
for an in-depth tutorial on
managing QMP events.
- class qemu.aqmp.QMPClient(name: Optional[str] = None)[source]
Bases:
qemu.aqmp.protocol.AsyncProtocol
[qemu.aqmp.message.Message
],qemu.aqmp.events.Events
Implements a QMP client connection.
QMP can be used to establish a connection as either the transport client or server, though this class always acts as the QMP client.
- Parameters
name – Optional nickname for the connection, used for logging.
Basic script-style usage looks like this:
qmp = QMPClient('my_virtual_machine_name') await qmp.connect(('127.0.0.1', 1234)) ... res = await qmp.execute('block-query') ... await qmp.disconnect()
Basic async client-style usage looks like this:
class Client: def __init__(self, name: str): self.qmp = QMPClient(name) async def watch_events(self): try: async for event in self.qmp.events: print(f"Event: {event['event']}") except asyncio.CancelledError: return async def run(self, address='/tmp/qemu.socket'): await self.qmp.connect(address) asyncio.create_task(self.watch_events()) await self.qmp.runstate_changed.wait() await self.disconnect()
See
aqmp.events
for more detail on event handling patterns.- logger: logging.Logger = <Logger qemu.aqmp.qmp_client (WARNING)>
Logger object used for debugging messages.
- await_greeting: bool
Whether or not to await a greeting after establishing a connection.
- negotiate: bool
Whether or not to perform capabilities negotiation upon connection. Implies
await_greeting
.
- execute_msg(msg: qemu.aqmp.message.Message) → object[source]
Execute a QMP command and return its value.
- Parameters
msg – The QMP
Message
to execute.- Returns
The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a
dict
.- Raises
ValueError – If the QMP
Message
does not have either the ‘execute’ or ‘exec-oob’ fields set.ExecuteError – When the server returns an error response.
ExecInterruptedError – if the connection was terminated early.
- classmethod make_execute_msg(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) → qemu.aqmp.message.Message[source]
Create an executable message to be sent by
execute_msg
later.
- async execute(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) → object[source]
Execute a QMP command and return its value.
- Parameters
cmd – QMP command name.
arguments – Arguments (if any). Must be JSON-serializable.
oob – If
True
, execute “out of band”.
- Returns
The command execution return value from the server. The type of object returned depends on the command that was issued, though most in QEMU return a
dict
.- Raises
ExecuteError – When the server returns an error response.
ExecInterruptedError – if the connection was terminated early.
- accept(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) → None
Accept a connection and begin processing message queues.
If this call fails,
runstate
is guaranteed to be set back toIDLE
.- Parameters
address – Address to listen to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises
StateError – When the
Runstate
is notIDLE
.ConnectError – If a connection could not be accepted.
- connect(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) → None
Connect to the server and begin processing message queues.
If this call fails,
runstate
is guaranteed to be set back toIDLE
.- Parameters
address – Address to connect to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises
StateError – When the
Runstate
is notIDLE
.ConnectError – If a connection cannot be made to the server.
- async disconnect() → None
Disconnect and wait for all tasks to fully stop.
If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.
- Raises
Exception – When the reader or writer terminate unexpectedly.
- listen(*listeners: qemu.aqmp.events.EventListener) → Iterator[None]
Context manager: Temporarily listen with an
EventListener
.Accepts one or more
EventListener
objects and registers them, activating them for the duration of the context block.EventListener
objects will have any pending events in their FIFO queue cleared upon exiting the context block, when they are deactivated.- Parameters
*listeners – One or more EventListeners to activate.
- Raises
ListenerError – If the given listener(s) are already active.
- listener(names: Optional[Union[str, Iterable[str]]] = (), event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]] = None) → Iterator[qemu.aqmp.events.EventListener]
Context manager: Temporarily listen with a new
EventListener
.Creates an
EventListener
object and registers it, activating it for the duration of the context block.- Parameters
names – One or more names of events to listen for. When not provided, listen for ALL events.
event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.
- Returns
The newly created and active
EventListener
.
- register_listener(listener: qemu.aqmp.events.EventListener) → None
Register and activate an
EventListener
.- Parameters
listener – The listener to activate.
- Raises
ListenerError – If the given listener is already registered.
- remove_listener(listener: qemu.aqmp.events.EventListener) → None
Unregister and deactivate an
EventListener
.The removed listener will have its pending events cleared via
clear()
. The listener can be re-registered later when desired.- Parameters
listener – The listener to deactivate.
- Raises
ListenerError – If the given listener is not registered.
- property runstate: qemu.aqmp.protocol.Runstate
The current
Runstate
of the connection.
- async runstate_changed() → qemu.aqmp.protocol.Runstate
Wait for the
runstate
to change, then return that runstate.
- name: Optional[str]
The nickname for this connection, if any.
- events: EventListener
Default, all-events
EventListener
.
- class qemu.aqmp.Message(value: Union[bytes, Mapping[str, object]] = b'{}', *, eager: bool = True)[source]
Bases:
MutableMapping
[str
,object
]Represents a single QMP protocol message.
QMP uses JSON objects as its basic communicative unit; so this Python object is a
MutableMapping
. It may be instantiated from either another mapping (like adict
), or from rawbytes
that still need to be deserialized.Once instantiated, it may be treated like any other MutableMapping:
>>> msg = Message(b'{"hello": "world"}') >>> assert msg['hello'] == 'world' >>> msg['id'] = 'foobar' >>> print(msg) { "hello": "world", "id": "foobar" }
It can be converted to
bytes
:>>> msg = Message({"hello": "world"}) >>> print(bytes(msg)) b'{"hello":"world","id":"foobar"}'
Or back into a garden-variety
dict
:>>> dict(msg) {'hello': 'world'}
- Parameters
value – Initial value, if any.
eager – When
True
, attempt to serialize or deserialize the initial value immediately, so that conversion exceptions are raised during the call to__init__()
.
- class qemu.aqmp.EventListener(names: Optional[Union[str, Iterable[str]]] = None, event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]] = None)[source]
Bases:
object
Selectively listens for events with runtime configurable filtering.
This class is designed to be directly usable for the most common cases, but it can be extended to provide more rigorous control.
- Parameters
names – One or more names of events to listen for. When not provided, listen for ALL events.
event_filter – An optional event filtering function. When names are also provided, this acts as a secondary filter.
When
names
andevent_filter
are both provided, the names will be filtered first, and then the filter function will be called second. The event filter function can assume that the format of the event is a known format.- names: Set[str]
Primary event filter, based on one or more event names.
- event_filter: Optional[Callable[[qemu.aqmp.message.Message], bool]]
Optional, secondary event filter.
- property history: Tuple[qemu.aqmp.message.Message, ...]
A read-only history of all events seen so far.
This represents every event, including those not yet witnessed via
get()
orasync for
. It persists betweenclear()
calls and is immutable.
- accept(event: qemu.aqmp.message.Message) → bool[source]
Determine if this listener accepts this event.
This method determines which events will appear in the stream. The default implementation simply checks the event against the list of names and the event_filter to decide if this
EventListener
accepts a given event. It can be overridden/extended to provide custom listener behavior.User code is not expected to need to invoke this method.
- Parameters
event – The event under consideration.
- Returns
True
, if this listener accepts this event.
- async put(event: qemu.aqmp.message.Message) → None[source]
Conditionally put a new event into the FIFO queue.
This method is not designed to be invoked from user code, and it should not need to be overridden. It is a public interface so that
QMPClient
has an interface by which it can inform registered listeners of new events.The event will be put into the queue if
accept()
returnsTrue
.- Parameters
event – The new event to put into the FIFO queue.
- async get() → qemu.aqmp.message.Message[source]
Wait for the very next event in this stream.
If one is already available, return that one.
- clear() → None[source]
Clear this listener of all pending events.
Called when an
EventListener
is being unregistered, this clears the pending FIFO queue synchronously. It can be also be used to manually clear any pending events, if desired.Warning
Take care when discarding events. Cleared events will be silently tossed on the floor. All events that were ever accepted by this listener are visible in
history()
.
- class qemu.aqmp.Runstate(value)[source]
Bases:
enum.Enum
Protocol session runstate.
- IDLE = 0
Fully quiesced and disconnected.
- CONNECTING = 1
In the process of connecting or establishing a session.
- RUNNING = 2
Fully connected and active session.
- DISCONNECTING = 3
In the process of disconnecting. Runstate may be returned to
IDLE
by callingdisconnect()
.
- exception qemu.aqmp.AQMPError[source]
Bases:
Exception
Abstract error class for all errors originating from this package.
- exception qemu.aqmp.StateError(error_message: str, state: qemu.aqmp.protocol.Runstate, required: qemu.aqmp.protocol.Runstate)[source]
Bases:
qemu.aqmp.error.AQMPError
An API command (connect, execute, etc) was issued at an inappropriate time.
This error is raised when a command like
connect()
is issued at an inappropriate time.
- exception qemu.aqmp.ConnectError(error_message: str, exc: Exception)[source]
Bases:
qemu.aqmp.error.AQMPError
Raised when the initial connection process has failed.
This Exception always wraps a “root cause” exception that can be interrogated for additional information.
- Parameters
error_message – Human-readable string describing the error.
exc – The root-cause exception.
- error_message: str
Human-readable error string
- exc: Exception
Wrapped root cause exception
- exception qemu.aqmp.ExecuteError(error_response: qemu.aqmp.models.ErrorResponse, sent: qemu.aqmp.message.Message, received: qemu.aqmp.message.Message)[source]
Bases:
qemu.aqmp.error.AQMPError
Exception raised by
QMPClient.execute()
on RPC failure.- Parameters
error_response – The RPC error response object.
sent – The sent RPC message that caused the failure.
received – The raw RPC error reply received.
- sent: qemu.aqmp.message.Message
The sent
Message
that caused the failure
- received: qemu.aqmp.message.Message
The received
Message
that indicated failure
- error: qemu.aqmp.models.ErrorResponse
The parsed error response
- error_class: str
The QMP error class
- exception qemu.aqmp.ExecInterruptedError[source]
Bases:
qemu.aqmp.error.AQMPError
Exception raised by
execute()
(et al) when an RPC is interrupted.This error is raised when an
execute()
statement could not be completed. This can occur because the connection itself was terminated before a reply was received.The true cause of the interruption will be available via
disconnect()
.
Submodules¶
- qemu.aqmp.error module
- qemu.aqmp.events module
- EventListener Tutorial
listener()
context blocks with one namelistener()
context blocks with two or more nameslistener()
context blocks with no names- Using async iterators to retrieve events
- Using asyncio.Task to concurrently retrieve events
- Using
register_listener()
andremove_listener()
- Using the built-in all events listener
- Using both .get() and async iterators
- Creating multiple listeners
- Clearing listeners
- Accessing listener history
- Using event filters
- Activating an existing listener with
listen()
- Activating multiple existing listeners with
listen()
- Extending the
EventListener
class
- Experimental Interfaces & Design Issues
- API Reference
- EventListener Tutorial
- qemu.aqmp.message module
- qemu.aqmp.models module
- qemu.aqmp.protocol module
- qemu.aqmp.qmp_client module
- qemu.aqmp.util module