qemu.aqmp.protocol module

Generic Asynchronous Message-based Protocol Support

This module provides a generic framework for sending and receiving messages over an asyncio stream. AsyncProtocol is an abstract class that implements the core mechanisms of a simple send/receive protocol, and is designed to be extended.

In this package, it is used as the implementation for the QMPClient class.

class qemu.aqmp.protocol.Runstate(value)[source]

Bases: enum.Enum

Protocol session runstate.

IDLE = 0

Fully quiesced and disconnected.

CONNECTING = 1

In the process of connecting or establishing a session.

RUNNING = 2

Fully connected and active session.

DISCONNECTING = 3

In the process of disconnecting. Runstate may be returned to IDLE by calling disconnect().

exception qemu.aqmp.protocol.ConnectError(error_message: str, exc: Exception)[source]

Bases: qemu.aqmp.error.AQMPError

Raised when the initial connection process has failed.

This Exception always wraps a “root cause” exception that can be interrogated for additional information.

Parameters
  • error_message – Human-readable string describing the error.

  • exc – The root-cause exception.

error_message: str

Human-readable error string

exc: Exception

Wrapped root cause exception

exception qemu.aqmp.protocol.StateError(error_message: str, state: qemu.aqmp.protocol.Runstate, required: qemu.aqmp.protocol.Runstate)[source]

Bases: qemu.aqmp.error.AQMPError

An API command (connect, execute, etc) was issued at an inappropriate time.

This error is raised when a command like connect() is issued at an inappropriate time.

Parameters
  • error_message – Human-readable string describing the state violation.

  • state – The actual Runstate seen at the time of the violation.

  • required – The Runstate required to process this command.

qemu.aqmp.protocol.require(required_state: qemu.aqmp.protocol.Runstate)Callable[[qemu.aqmp.protocol.F], qemu.aqmp.protocol.F][source]

Decorator: protect a method so it can only be run in a certain Runstate.

Parameters

required_state – The Runstate required to invoke this method.

Raises

StateError – When the required Runstate is not met.

class qemu.aqmp.protocol.AsyncProtocol(name: Optional[str] = None)[source]

Bases: Generic[qemu.aqmp.protocol.T]

AsyncProtocol implements a generic async message-based protocol.

This protocol assumes the basic unit of information transfer between client and server is a “message”, the details of which are left up to the implementation. It assumes the sending and receiving of these messages is full-duplex and not necessarily correlated; i.e. it supports asynchronous inbound messages.

It is designed to be extended by a specific protocol which provides the implementations for how to read and send messages. These must be defined in _do_recv() and _do_send(), respectively.

Other callbacks have a default implementation, but are intended to be either extended or overridden:

  • _establish_session:

    The base implementation starts the reader/writer tasks. A protocol implementation can override this call, inserting actions to be taken prior to starting the reader/writer tasks before the super() call; actions needing to occur afterwards can be written after the super() call.

  • _on_message:

    Actions to be performed when a message is received.

  • _cb_outbound:

    Logging/Filtering hook for all outbound messages.

  • _cb_inbound:

    Logging/Filtering hook for all inbound messages. This hook runs before _on_message().

Parameters

name – Name used for logging messages, if any. By default, messages will log to ‘qemu.aqmp.protocol’, but each individual connection can be given its own logger by giving it a name; messages will then log to ‘qemu.aqmp.protocol.${name}’.

_limit = 65536
name: Optional[str]

The nickname for this connection, if any.

logger = <Logger qemu.aqmp.protocol (WARNING)>

Logger object for debugging messages from this connection.

_dc_task: Optional[_asyncio.Future]

Disconnect task. The disconnect implementation runs in a task so that asynchronous disconnects (initiated by the reader/writer) are allowed to wait for the reader/writers to exit.

property runstate: qemu.aqmp.protocol.Runstate

The current Runstate of the connection.

async runstate_changed()qemu.aqmp.protocol.Runstate[source]

Wait for the runstate to change, then return that runstate.

accept(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None)None[source]

Accept a connection and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters
  • address – Address to listen to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises
connect(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None)None[source]

Connect to the server and begin processing message queues.

If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters
  • address – Address to connect to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises
async disconnect()None[source]

Disconnect and wait for all tasks to fully stop.

If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.

Raises

Exception – When the reader or writer terminate unexpectedly.

property _runstate_event: asyncio.locks.Event
_set_state(state: qemu.aqmp.protocol.Runstate)None[source]

Change the Runstate of the protocol connection.

Signals the runstate_changed event.

async _new_session(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None, accept: bool = False)None[source]

Establish a new connection and initialize the session.

Connect or accept a new connection, then begin the protocol session machinery. If this call fails, runstate is guaranteed to be set back to IDLE.

Parameters
  • address – Address to connect to/listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

  • accept – Accept a connection instead of connecting when True.

Raises

ConnectError

When a connection or session cannot be established.

This exception will wrap a more concrete one. In most cases, the wrapped exception will be OSError or EOFError. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be an AQMPError.

async _establish_connection(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None, accept: bool = False)None[source]

Establish a new connection.

Parameters
  • address – Address to connect to/listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

  • accept – Accept a connection instead of connecting when True.

async _do_accept(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None)None[source]

Acting as the transport server, accept a single connection.

Parameters
  • address – Address to listen on; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises

OSError – For stream-related errors.

async _do_connect(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None)None[source]

Acting as the transport client, initiate a connection to a server.

Parameters
  • address – Address to connect to; UNIX socket path or TCP address/port.

  • ssl – SSL context to use, if any.

Raises

OSError – For stream-related errors.

async _establish_session()None[source]

Establish a new session.

Starts the readers/writer tasks; subclasses may perform their own negotiations here. The Runstate will be RUNNING upon successful conclusion.

_schedule_disconnect()None[source]

Initiate a disconnect; idempotent.

This method is used both in the upper-half as a direct consequence of disconnect(), and in the bottom-half in the case of unhandled exceptions in the reader/writer tasks.

It can be invoked no matter what the runstate is.

async _wait_disconnect()None[source]

Waits for a previously scheduled disconnect to finish.

This method will gather any bottom half exceptions and re-raise the one that occurred first; presuming it to be the root cause of any subsequent Exceptions. It is intended to be used in the upper half of the call chain.

Raises

Exception – Arbitrary exception re-raised on behalf of the reader/writer.

_cleanup()None[source]

Fully reset this object to a clean state and return to IDLE.

async _bh_disconnect()None[source]

Disconnect and cancel all outstanding tasks.

It is designed to be called from its task context, _dc_task. By running in its own task, it is free to wait on any pending actions that may still need to occur in either the reader or writer tasks.

async _bh_flush_writer()None[source]
async _bh_close_stream(error_pathway: bool = False)None[source]
async _bh_loop_forever(async_fn: Callable[], Awaitable[None]], name: str)None[source]

Run one of the bottom-half methods in a loop forever.

If the bottom half ever raises any exception, schedule a disconnect that will terminate the entire loop.

Parameters
  • async_fn – The bottom-half method to run in a loop.

  • name – The name of this task, used for logging.

async _bh_send_message()None[source]

Wait for an outgoing message, then send it.

Designed to be run in _bh_loop_forever().

async _bh_recv_message()None[source]

Wait for an incoming message and call _on_message to route it.

Designed to be run in _bh_loop_forever().

_cb_outbound(msg: qemu.aqmp.protocol.T)qemu.aqmp.protocol.T[source]

Callback: outbound message hook.

This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate outgoing messages. The base implementation does nothing but log the message without any manipulation of the message.

Parameters

msg – raw outbound message

Returns

final outbound message

_cb_inbound(msg: qemu.aqmp.protocol.T)qemu.aqmp.protocol.T[source]

Callback: inbound message hook.

This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate incoming messages. The base implementation does nothing but log the message without any manipulation of the message.

This method does not “handle” incoming messages; it is a filter. The actual “endpoint” for incoming messages is _on_message().

Parameters

msg – raw inbound message

Returns

processed inbound message

async _readline()bytes[source]

Wait for a newline from the incoming reader.

This method is provided as a convenience for upper-layer protocols, as many are line-based.

This method may return a sequence of bytes without a trailing newline if EOF occurs, but some bytes were received. In this case, the next call will raise EOFError. It is assumed that the layer 5 protocol will decide if there is anything meaningful to be done with a partial message.

Raises
  • OSError – For stream-related errors.

  • EOFError – If the reader stream is at EOF and there are no bytes to return.

Returns

bytes, including the newline.

async _do_recv()qemu.aqmp.protocol.T[source]

Abstract: Read from the stream and return a message.

Very low-level; intended to only be called by _recv().

async _recv()qemu.aqmp.protocol.T[source]

Read an arbitrary protocol message.

Warning

This method is intended primarily for _bh_recv_message() to use in an asynchronous task loop. Using it outside of this loop will “steal” messages from the normal routing mechanism. It is safe to use prior to _establish_session(), but should not be used otherwise.

This method uses _do_recv() to retrieve the raw message, and then transforms it using _cb_inbound().

Returns

A single (filtered, processed) protocol message.

_do_send(msg: qemu.aqmp.protocol.T)None[source]

Abstract: Write a message to the stream.

Very low-level; intended to only be called by _send().

async _send(msg: qemu.aqmp.protocol.T)None[source]

Send an arbitrary protocol message.

This method will transform any outgoing messages according to _cb_outbound().

Warning

Like _recv(), this method is intended to be called by the writer task loop that processes outgoing messages. Calling it directly may circumvent logic implemented by the caller meant to correlate outgoing and incoming messages.

Raises

OSError – For problems with the underlying stream.

async _on_message(msg: qemu.aqmp.protocol.T)None[source]

Called to handle the receipt of a new message.

Caution

This is executed from within the reader loop, so be advised that waiting on either the reader or writer task will lead to deadlock. Additionally, any unhandled exceptions will directly cause the loop to halt, so logic may be best-kept to a minimum if at all possible.

Parameters

msg – The incoming message, already logged/filtered.