Merge tag 'pull-request-2022-05-18' of https://gitlab.com/thuth/qemu into staging
[qemu.git] / tests / migration / guestperf / engine.py
1 #
2 # Migration test main engine
3 #
4 # Copyright (c) 2016 Red Hat, Inc.
5 #
6 # This library is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU Lesser General Public
8 # License as published by the Free Software Foundation; either
9 # version 2.1 of the License, or (at your option) any later version.
10 #
11 # This library is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 #
19
20
21 import os
22 import re
23 import sys
24 import time
25
26 from guestperf.progress import Progress, ProgressStats
27 from guestperf.report import Report
28 from guestperf.timings import TimingRecord, Timings
29
30 sys.path.append(os.path.join(os.path.dirname(__file__),
31 '..', '..', '..', 'python'))
32 from qemu.machine import QEMUMachine
33
34
35 class Engine(object):
36
37 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
38 sleep=15, verbose=False, debug=False):
39
40 self._binary = binary # Path to QEMU binary
41 self._dst_host = dst_host # Hostname of target host
42 self._kernel = kernel # Path to kernel image
43 self._initrd = initrd # Path to stress initrd
44 self._transport = transport # 'unix' or 'tcp' or 'rdma'
45 self._sleep = sleep
46 self._verbose = verbose
47 self._debug = debug
48
49 if debug:
50 self._verbose = debug
51
52 def _vcpu_timing(self, pid, tid_list):
53 records = []
54 now = time.time()
55
56 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
57 for tid in tid_list:
58 statfile = "/proc/%d/task/%d/stat" % (pid, tid)
59 with open(statfile, "r") as fh:
60 stat = fh.readline()
61 fields = stat.split(" ")
62 stime = int(fields[13])
63 utime = int(fields[14])
64 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
65 return records
66
67 def _cpu_timing(self, pid):
68 records = []
69 now = time.time()
70
71 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
72 statfile = "/proc/%d/stat" % pid
73 with open(statfile, "r") as fh:
74 stat = fh.readline()
75 fields = stat.split(" ")
76 stime = int(fields[13])
77 utime = int(fields[14])
78 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
79
80 def _migrate_progress(self, vm):
81 info = vm.command("query-migrate")
82
83 if "ram" not in info:
84 info["ram"] = {}
85
86 return Progress(
87 info.get("status", "active"),
88 ProgressStats(
89 info["ram"].get("transferred", 0),
90 info["ram"].get("remaining", 0),
91 info["ram"].get("total", 0),
92 info["ram"].get("duplicate", 0),
93 info["ram"].get("skipped", 0),
94 info["ram"].get("normal", 0),
95 info["ram"].get("normal-bytes", 0),
96 info["ram"].get("dirty-pages-rate", 0),
97 info["ram"].get("mbps", 0),
98 info["ram"].get("dirty-sync-count", 0)
99 ),
100 time.time(),
101 info.get("total-time", 0),
102 info.get("downtime", 0),
103 info.get("expected-downtime", 0),
104 info.get("setup-time", 0),
105 info.get("cpu-throttle-percentage", 0),
106 )
107
108 def _migrate(self, hardware, scenario, src, dst, connect_uri):
109 src_qemu_time = []
110 src_vcpu_time = []
111 src_pid = src.get_pid()
112
113 vcpus = src.command("query-cpus-fast")
114 src_threads = []
115 for vcpu in vcpus:
116 src_threads.append(vcpu["thread-id"])
117
118 # XXX how to get dst timings on remote host ?
119
120 if self._verbose:
121 print("Sleeping %d seconds for initial guest workload run" % self._sleep)
122 sleep_secs = self._sleep
123 while sleep_secs > 1:
124 src_qemu_time.append(self._cpu_timing(src_pid))
125 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
126 time.sleep(1)
127 sleep_secs -= 1
128
129 if self._verbose:
130 print("Starting migration")
131 if scenario._auto_converge:
132 resp = src.command("migrate-set-capabilities",
133 capabilities = [
134 { "capability": "auto-converge",
135 "state": True }
136 ])
137 resp = src.command("migrate-set-parameters",
138 cpu_throttle_increment=scenario._auto_converge_step)
139
140 if scenario._post_copy:
141 resp = src.command("migrate-set-capabilities",
142 capabilities = [
143 { "capability": "postcopy-ram",
144 "state": True }
145 ])
146 resp = dst.command("migrate-set-capabilities",
147 capabilities = [
148 { "capability": "postcopy-ram",
149 "state": True }
150 ])
151
152 resp = src.command("migrate-set-parameters",
153 max_bandwidth=scenario._bandwidth * 1024 * 1024)
154
155 resp = src.command("migrate-set-parameters",
156 downtime_limit=scenario._downtime)
157
158 if scenario._compression_mt:
159 resp = src.command("migrate-set-capabilities",
160 capabilities = [
161 { "capability": "compress",
162 "state": True }
163 ])
164 resp = src.command("migrate-set-parameters",
165 compress_threads=scenario._compression_mt_threads)
166 resp = dst.command("migrate-set-capabilities",
167 capabilities = [
168 { "capability": "compress",
169 "state": True }
170 ])
171 resp = dst.command("migrate-set-parameters",
172 decompress_threads=scenario._compression_mt_threads)
173
174 if scenario._compression_xbzrle:
175 resp = src.command("migrate-set-capabilities",
176 capabilities = [
177 { "capability": "xbzrle",
178 "state": True }
179 ])
180 resp = dst.command("migrate-set-capabilities",
181 capabilities = [
182 { "capability": "xbzrle",
183 "state": True }
184 ])
185 resp = src.command("migrate-set-parameters",
186 xbzrle_cache_size=(
187 hardware._mem *
188 1024 * 1024 * 1024 / 100 *
189 scenario._compression_xbzrle_cache))
190
191 if scenario._multifd:
192 resp = src.command("migrate-set-capabilities",
193 capabilities = [
194 { "capability": "multifd",
195 "state": True }
196 ])
197 resp = src.command("migrate-set-parameters",
198 multifd_channels=scenario._multifd_channels)
199 resp = dst.command("migrate-set-capabilities",
200 capabilities = [
201 { "capability": "multifd",
202 "state": True }
203 ])
204 resp = dst.command("migrate-set-parameters",
205 multifd_channels=scenario._multifd_channels)
206
207 resp = src.command("migrate", uri=connect_uri)
208
209 post_copy = False
210 paused = False
211
212 progress_history = []
213
214 start = time.time()
215 loop = 0
216 while True:
217 loop = loop + 1
218 time.sleep(0.05)
219
220 progress = self._migrate_progress(src)
221 if (loop % 20) == 0:
222 src_qemu_time.append(self._cpu_timing(src_pid))
223 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
224
225 if (len(progress_history) == 0 or
226 (progress_history[-1]._ram._iterations <
227 progress._ram._iterations)):
228 progress_history.append(progress)
229
230 if progress._status in ("completed", "failed", "cancelled"):
231 if progress._status == "completed" and paused:
232 dst.command("cont")
233 if progress_history[-1] != progress:
234 progress_history.append(progress)
235
236 if progress._status == "completed":
237 if self._verbose:
238 print("Sleeping %d seconds for final guest workload run" % self._sleep)
239 sleep_secs = self._sleep
240 while sleep_secs > 1:
241 time.sleep(1)
242 src_qemu_time.append(self._cpu_timing(src_pid))
243 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
244 sleep_secs -= 1
245
246 return [progress_history, src_qemu_time, src_vcpu_time]
247
248 if self._verbose and (loop % 20) == 0:
249 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
250 progress._ram._iterations,
251 progress._ram._remaining_bytes / (1024 * 1024),
252 progress._ram._total_bytes / (1024 * 1024),
253 progress._ram._transferred_bytes / (1024 * 1024),
254 progress._ram._transfer_rate_mbs,
255 ))
256
257 if progress._ram._iterations > scenario._max_iters:
258 if self._verbose:
259 print("No completion after %d iterations over RAM" % scenario._max_iters)
260 src.command("migrate_cancel")
261 continue
262
263 if time.time() > (start + scenario._max_time):
264 if self._verbose:
265 print("No completion after %d seconds" % scenario._max_time)
266 src.command("migrate_cancel")
267 continue
268
269 if (scenario._post_copy and
270 progress._ram._iterations >= scenario._post_copy_iters and
271 not post_copy):
272 if self._verbose:
273 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
274 resp = src.command("migrate-start-postcopy")
275 post_copy = True
276
277 if (scenario._pause and
278 progress._ram._iterations >= scenario._pause_iters and
279 not paused):
280 if self._verbose:
281 print("Pausing VM after %d iterations" % scenario._pause_iters)
282 resp = src.command("stop")
283 paused = True
284
285 def _get_common_args(self, hardware, tunnelled=False):
286 args = [
287 "noapic",
288 "edd=off",
289 "printk.time=1",
290 "noreplace-smp",
291 "cgroup_disable=memory",
292 "pci=noearly",
293 "console=ttyS0",
294 ]
295 if self._debug:
296 args.append("debug")
297 else:
298 args.append("quiet")
299
300 args.append("ramsize=%s" % hardware._mem)
301
302 cmdline = " ".join(args)
303 if tunnelled:
304 cmdline = "'" + cmdline + "'"
305
306 argv = [
307 "-accel", "kvm",
308 "-cpu", "host",
309 "-kernel", self._kernel,
310 "-initrd", self._initrd,
311 "-append", cmdline,
312 "-chardev", "stdio,id=cdev0",
313 "-device", "isa-serial,chardev=cdev0",
314 "-m", str((hardware._mem * 1024) + 512),
315 "-smp", str(hardware._cpus),
316 ]
317
318 if self._debug:
319 argv.extend(["-device", "sga"])
320
321 if hardware._prealloc_pages:
322 argv_source += ["-mem-path", "/dev/shm",
323 "-mem-prealloc"]
324 if hardware._locked_pages:
325 argv_source += ["-overcommit", "mem-lock=on"]
326 if hardware._huge_pages:
327 pass
328
329 return argv
330
331 def _get_src_args(self, hardware):
332 return self._get_common_args(hardware)
333
334 def _get_dst_args(self, hardware, uri):
335 tunnelled = False
336 if self._dst_host != "localhost":
337 tunnelled = True
338 argv = self._get_common_args(hardware, tunnelled)
339 return argv + ["-incoming", uri]
340
341 @staticmethod
342 def _get_common_wrapper(cpu_bind, mem_bind):
343 wrapper = []
344 if len(cpu_bind) > 0 or len(mem_bind) > 0:
345 wrapper.append("numactl")
346 if cpu_bind:
347 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
348 if mem_bind:
349 wrapper.append("--membind=%s" % ",".join(mem_bind))
350
351 return wrapper
352
353 def _get_src_wrapper(self, hardware):
354 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
355
356 def _get_dst_wrapper(self, hardware):
357 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
358 if self._dst_host != "localhost":
359 return ["ssh",
360 "-R", "9001:localhost:9001",
361 self._dst_host] + wrapper
362 else:
363 return wrapper
364
365 def _get_timings(self, vm):
366 log = vm.get_log()
367 if not log:
368 return []
369 if self._debug:
370 print(log)
371
372 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
373 matcher = re.compile(regex)
374 records = []
375 for line in log.split("\n"):
376 match = matcher.match(line)
377 if match:
378 records.append(TimingRecord(int(match.group(1)),
379 int(match.group(2)) / 1000.0,
380 int(match.group(3))))
381 return records
382
383 def run(self, hardware, scenario, result_dir=os.getcwd()):
384 abs_result_dir = os.path.join(result_dir, scenario._name)
385
386 if self._transport == "tcp":
387 uri = "tcp:%s:9000" % self._dst_host
388 elif self._transport == "rdma":
389 uri = "rdma:%s:9000" % self._dst_host
390 elif self._transport == "unix":
391 if self._dst_host != "localhost":
392 raise Exception("Running use unix migration transport for non-local host")
393 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
394 try:
395 os.remove(uri[5:])
396 os.remove(monaddr)
397 except:
398 pass
399
400 if self._dst_host != "localhost":
401 dstmonaddr = ("localhost", 9001)
402 else:
403 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
404 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
405
406 src = QEMUMachine(self._binary,
407 args=self._get_src_args(hardware),
408 wrapper=self._get_src_wrapper(hardware),
409 name="qemu-src-%d" % os.getpid(),
410 monitor_address=srcmonaddr)
411
412 dst = QEMUMachine(self._binary,
413 args=self._get_dst_args(hardware, uri),
414 wrapper=self._get_dst_wrapper(hardware),
415 name="qemu-dst-%d" % os.getpid(),
416 monitor_address=dstmonaddr)
417
418 try:
419 src.launch()
420 dst.launch()
421
422 ret = self._migrate(hardware, scenario, src, dst, uri)
423 progress_history = ret[0]
424 qemu_timings = ret[1]
425 vcpu_timings = ret[2]
426 if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
427 os.remove(uri[5:])
428
429 if os.path.exists(srcmonaddr):
430 os.remove(srcmonaddr)
431
432 if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
433 os.remove(dstmonaddr)
434
435 if self._verbose:
436 print("Finished migration")
437
438 src.shutdown()
439 dst.shutdown()
440
441 return Report(hardware, scenario, progress_history,
442 Timings(self._get_timings(src) + self._get_timings(dst)),
443 Timings(qemu_timings),
444 Timings(vcpu_timings),
445 self._binary, self._dst_host, self._kernel,
446 self._initrd, self._transport, self._sleep)
447 except Exception as e:
448 if self._debug:
449 print("Failed: %s" % str(e))
450 try:
451 src.shutdown()
452 except:
453 pass
454 try:
455 dst.shutdown()
456 except:
457 pass
458
459 if self._debug:
460 print(src.get_log())
461 print(dst.get_log())
462 raise
463