python/qemu/console_socket.py: fix typing of settimeout
[qemu.git] / python / qemu / console_socket.py
1 """
2 QEMU Console Socket Module:
3
4 This python module implements a ConsoleSocket object,
5 which can drain a socket and optionally dump the bytes to file.
6 """
7 # Copyright 2020 Linaro
8 #
9 # Authors:
10 # Robert Foley <robert.foley@linaro.org>
11 #
12 # This code is licensed under the GPL version 2 or later. See
13 # the COPYING file in the top-level directory.
14 #
15
16 from collections import deque
17 import socket
18 import threading
19 import time
20 from typing import Optional
21
22
23 class ConsoleSocket(socket.socket):
24 """
25 ConsoleSocket represents a socket attached to a char device.
26
27 Optionally (if drain==True), drains the socket and places the bytes
28 into an in memory buffer for later processing.
29
30 Optionally a file path can be passed in and we will also
31 dump the characters to this file for debugging purposes.
32 """
33 def __init__(self, address, file=None, drain=False):
34 self._recv_timeout_sec = 300
35 self._recv_timeout_sec = 300.0
36 self._sleep_time = 0.5
37 self._buffer = deque()
38 socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
39 self.connect(address)
40 self._logfile = None
41 if file:
42 self._logfile = open(file, "w")
43 self._open = True
44 if drain:
45 self._drain_thread = self._thread_start()
46 else:
47 self._drain_thread = None
48
49 def _drain_fn(self):
50 """Drains the socket and runs while the socket is open."""
51 while self._open:
52 try:
53 self._drain_socket()
54 except socket.timeout:
55 # The socket is expected to timeout since we set a
56 # short timeout to allow the thread to exit when
57 # self._open is set to False.
58 time.sleep(self._sleep_time)
59
60 def _thread_start(self):
61 """Kick off a thread to drain the socket."""
62 # Configure socket to not block and timeout.
63 # This allows our drain thread to not block
64 # on recieve and exit smoothly.
65 socket.socket.setblocking(self, False)
66 socket.socket.settimeout(self, 1)
67 drain_thread = threading.Thread(target=self._drain_fn)
68 drain_thread.daemon = True
69 drain_thread.start()
70 return drain_thread
71
72 def close(self):
73 """Close the base object and wait for the thread to terminate"""
74 if self._open:
75 self._open = False
76 if self._drain_thread is not None:
77 thread, self._drain_thread = self._drain_thread, None
78 thread.join()
79 socket.socket.close(self)
80 if self._logfile:
81 self._logfile.close()
82 self._logfile = None
83
84 def _drain_socket(self):
85 """process arriving characters into in memory _buffer"""
86 data = socket.socket.recv(self, 1)
87 # latin1 is needed since there are some chars
88 # we are receiving that cannot be encoded to utf-8
89 # such as 0xe2, 0x80, 0xA6.
90 string = data.decode("latin1")
91 if self._logfile:
92 self._logfile.write("{}".format(string))
93 self._logfile.flush()
94 for c in string:
95 self._buffer.extend(c)
96
97 def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
98 """Return chars from in memory buffer.
99 Maintains the same API as socket.socket.recv.
100 """
101 if self._drain_thread is None:
102 # Not buffering the socket, pass thru to socket.
103 return socket.socket.recv(self, bufsize, flags)
104 assert not flags, "Cannot pass flags to recv() in drained mode"
105 start_time = time.time()
106 while len(self._buffer) < bufsize:
107 time.sleep(self._sleep_time)
108 elapsed_sec = time.time() - start_time
109 if elapsed_sec > self._recv_timeout_sec:
110 raise socket.timeout
111 chars = ''.join([self._buffer.popleft() for i in range(bufsize)])
112 # We choose to use latin1 to remain consistent with
113 # handle_read() and give back the same data as the user would
114 # receive if they were reading directly from the
115 # socket w/o our intervention.
116 return chars.encode("latin1")
117
118 def setblocking(self, value):
119 """When not draining we pass thru to the socket,
120 since when draining we control socket blocking.
121 """
122 if self._drain_thread is None:
123 socket.socket.setblocking(self, value)
124
125 def settimeout(self, value: Optional[float]) -> None:
126 """When not draining we pass thru to the socket,
127 since when draining we control the timeout.
128 """
129 if value is not None:
130 self._recv_timeout_sec = value
131 if self._drain_thread is None:
132 socket.socket.settimeout(self, value)