disas: Clean up CPUDebug initialization
[qemu.git] / python / qemu / qtest.py
1 """
2 QEMU qtest library
3
4 qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
5 offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
6 subclass of QEMUMachine, respectively.
7 """
8
9 # Copyright (C) 2015 Red Hat Inc.
10 #
11 # Authors:
12 # Fam Zheng <famz@redhat.com>
13 #
14 # This work is licensed under the terms of the GNU GPL, version 2. See
15 # the COPYING file in the top-level directory.
16 #
17 # Based on qmp.py.
18 #
19
20 import socket
21 import os
22 from typing import Optional, TextIO
23
24 from .machine import QEMUMachine
25
26
27 class QEMUQtestProtocol:
28 """
29 QEMUQtestProtocol implements a connection to a qtest socket.
30
31 :param address: QEMU address, can be either a unix socket path (string)
32 or a tuple in the form ( address, port ) for a TCP
33 connection
34 :param server: server mode, listens on the socket (bool)
35 :raise socket.error: on socket connection errors
36
37 .. note::
38 No conection is estabalished by __init__(), this is done
39 by the connect() or accept() methods.
40 """
41 def __init__(self, address, server=False):
42 self._address = address
43 self._sock = self._get_sock()
44 self._sockfile: Optional[TextIO] = None
45 if server:
46 self._sock.bind(self._address)
47 self._sock.listen(1)
48
49 def _get_sock(self):
50 if isinstance(self._address, tuple):
51 family = socket.AF_INET
52 else:
53 family = socket.AF_UNIX
54 return socket.socket(family, socket.SOCK_STREAM)
55
56 def connect(self):
57 """
58 Connect to the qtest socket.
59
60 @raise socket.error on socket connection errors
61 """
62 self._sock.connect(self._address)
63 self._sockfile = self._sock.makefile(mode='r')
64
65 def accept(self):
66 """
67 Await connection from QEMU.
68
69 @raise socket.error on socket connection errors
70 """
71 self._sock, _ = self._sock.accept()
72 self._sockfile = self._sock.makefile(mode='r')
73
74 def cmd(self, qtest_cmd):
75 """
76 Send a qtest command on the wire.
77
78 @param qtest_cmd: qtest command text to be sent
79 """
80 assert self._sockfile is not None
81 self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
82 resp = self._sockfile.readline()
83 return resp
84
85 def close(self):
86 """Close this socket."""
87 self._sock.close()
88 if self._sockfile:
89 self._sockfile.close()
90 self._sockfile = None
91
92 def settimeout(self, timeout):
93 """Set a timeout, in seconds."""
94 self._sock.settimeout(timeout)
95
96
97 class QEMUQtestMachine(QEMUMachine):
98 """
99 A QEMU VM, with a qtest socket available.
100 """
101
102 def __init__(self, binary, args=None, name=None, test_dir="/var/tmp",
103 socket_scm_helper=None, sock_dir=None):
104 if name is None:
105 name = "qemu-%d" % os.getpid()
106 if sock_dir is None:
107 sock_dir = test_dir
108 super().__init__(binary, args, name=name, test_dir=test_dir,
109 socket_scm_helper=socket_scm_helper,
110 sock_dir=sock_dir)
111 self._qtest = None
112 self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
113
114 def _base_args(self):
115 args = super()._base_args()
116 args.extend(['-qtest', 'unix:path=' + self._qtest_path,
117 '-accel', 'qtest'])
118 return args
119
120 def _pre_launch(self):
121 super()._pre_launch()
122 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
123
124 def _post_launch(self) -> None:
125 assert self._qtest is not None
126 super()._post_launch()
127 self._qtest.accept()
128
129 def _post_shutdown(self):
130 super()._post_shutdown()
131 self._remove_if_exists(self._qtest_path)
132
133 def qtest(self, cmd: str) -> str:
134 """
135 Send a qtest command to the guest.
136
137 :param cmd: qtest command to send
138 :return: qtest server response
139 """
140 if self._qtest is None:
141 raise RuntimeError("qtest socket not available")
142 return self._qtest.cmd(cmd)