Merge remote-tracking branch 'remotes/dgilbert/tags/pull-migration-20200925a' into...
[qemu.git] / io / channel.c
1 /*
2 * QEMU I/O channels
3 *
4 * Copyright (c) 2015 Red Hat, Inc.
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 *
19 */
20
21 #include "qemu/osdep.h"
22 #include "io/channel.h"
23 #include "qapi/error.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/module.h"
26 #include "qemu/iov.h"
27
28 bool qio_channel_has_feature(QIOChannel *ioc,
29 QIOChannelFeature feature)
30 {
31 return ioc->features & (1 << feature);
32 }
33
34
35 void qio_channel_set_feature(QIOChannel *ioc,
36 QIOChannelFeature feature)
37 {
38 ioc->features |= (1 << feature);
39 }
40
41
42 void qio_channel_set_name(QIOChannel *ioc,
43 const char *name)
44 {
45 g_free(ioc->name);
46 ioc->name = g_strdup(name);
47 }
48
49
50 ssize_t qio_channel_readv_full(QIOChannel *ioc,
51 const struct iovec *iov,
52 size_t niov,
53 int **fds,
54 size_t *nfds,
55 Error **errp)
56 {
57 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
58
59 if ((fds || nfds) &&
60 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
61 error_setg_errno(errp, EINVAL,
62 "Channel does not support file descriptor passing");
63 return -1;
64 }
65
66 return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
67 }
68
69
70 ssize_t qio_channel_writev_full(QIOChannel *ioc,
71 const struct iovec *iov,
72 size_t niov,
73 int *fds,
74 size_t nfds,
75 Error **errp)
76 {
77 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
78
79 if ((fds || nfds) &&
80 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
81 error_setg_errno(errp, EINVAL,
82 "Channel does not support file descriptor passing");
83 return -1;
84 }
85
86 return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
87 }
88
89
90 int qio_channel_readv_all_eof(QIOChannel *ioc,
91 const struct iovec *iov,
92 size_t niov,
93 Error **errp)
94 {
95 int ret = -1;
96 struct iovec *local_iov = g_new(struct iovec, niov);
97 struct iovec *local_iov_head = local_iov;
98 unsigned int nlocal_iov = niov;
99 bool partial = false;
100
101 nlocal_iov = iov_copy(local_iov, nlocal_iov,
102 iov, niov,
103 0, iov_size(iov, niov));
104
105 while (nlocal_iov > 0) {
106 ssize_t len;
107 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
108 if (len == QIO_CHANNEL_ERR_BLOCK) {
109 if (qemu_in_coroutine()) {
110 qio_channel_yield(ioc, G_IO_IN);
111 } else {
112 qio_channel_wait(ioc, G_IO_IN);
113 }
114 continue;
115 } else if (len < 0) {
116 goto cleanup;
117 } else if (len == 0) {
118 if (partial) {
119 error_setg(errp,
120 "Unexpected end-of-file before all bytes were read");
121 } else {
122 ret = 0;
123 }
124 goto cleanup;
125 }
126
127 partial = true;
128 iov_discard_front(&local_iov, &nlocal_iov, len);
129 }
130
131 ret = 1;
132
133 cleanup:
134 g_free(local_iov_head);
135 return ret;
136 }
137
138 int qio_channel_readv_all(QIOChannel *ioc,
139 const struct iovec *iov,
140 size_t niov,
141 Error **errp)
142 {
143 int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp);
144
145 if (ret == 0) {
146 ret = -1;
147 error_setg(errp,
148 "Unexpected end-of-file before all bytes were read");
149 } else if (ret == 1) {
150 ret = 0;
151 }
152 return ret;
153 }
154
155 int qio_channel_writev_all(QIOChannel *ioc,
156 const struct iovec *iov,
157 size_t niov,
158 Error **errp)
159 {
160 int ret = -1;
161 struct iovec *local_iov = g_new(struct iovec, niov);
162 struct iovec *local_iov_head = local_iov;
163 unsigned int nlocal_iov = niov;
164
165 nlocal_iov = iov_copy(local_iov, nlocal_iov,
166 iov, niov,
167 0, iov_size(iov, niov));
168
169 while (nlocal_iov > 0) {
170 ssize_t len;
171 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
172 if (len == QIO_CHANNEL_ERR_BLOCK) {
173 if (qemu_in_coroutine()) {
174 qio_channel_yield(ioc, G_IO_OUT);
175 } else {
176 qio_channel_wait(ioc, G_IO_OUT);
177 }
178 continue;
179 }
180 if (len < 0) {
181 goto cleanup;
182 }
183
184 iov_discard_front(&local_iov, &nlocal_iov, len);
185 }
186
187 ret = 0;
188 cleanup:
189 g_free(local_iov_head);
190 return ret;
191 }
192
193 ssize_t qio_channel_readv(QIOChannel *ioc,
194 const struct iovec *iov,
195 size_t niov,
196 Error **errp)
197 {
198 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
199 }
200
201
202 ssize_t qio_channel_writev(QIOChannel *ioc,
203 const struct iovec *iov,
204 size_t niov,
205 Error **errp)
206 {
207 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
208 }
209
210
211 ssize_t qio_channel_read(QIOChannel *ioc,
212 char *buf,
213 size_t buflen,
214 Error **errp)
215 {
216 struct iovec iov = { .iov_base = buf, .iov_len = buflen };
217 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
218 }
219
220
221 ssize_t qio_channel_write(QIOChannel *ioc,
222 const char *buf,
223 size_t buflen,
224 Error **errp)
225 {
226 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
227 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
228 }
229
230
231 int qio_channel_read_all_eof(QIOChannel *ioc,
232 char *buf,
233 size_t buflen,
234 Error **errp)
235 {
236 struct iovec iov = { .iov_base = buf, .iov_len = buflen };
237 return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
238 }
239
240
241 int qio_channel_read_all(QIOChannel *ioc,
242 char *buf,
243 size_t buflen,
244 Error **errp)
245 {
246 struct iovec iov = { .iov_base = buf, .iov_len = buflen };
247 return qio_channel_readv_all(ioc, &iov, 1, errp);
248 }
249
250
251 int qio_channel_write_all(QIOChannel *ioc,
252 const char *buf,
253 size_t buflen,
254 Error **errp)
255 {
256 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
257 return qio_channel_writev_all(ioc, &iov, 1, errp);
258 }
259
260
261 int qio_channel_set_blocking(QIOChannel *ioc,
262 bool enabled,
263 Error **errp)
264 {
265 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
266 return klass->io_set_blocking(ioc, enabled, errp);
267 }
268
269
270 int qio_channel_close(QIOChannel *ioc,
271 Error **errp)
272 {
273 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
274 return klass->io_close(ioc, errp);
275 }
276
277
278 GSource *qio_channel_create_watch(QIOChannel *ioc,
279 GIOCondition condition)
280 {
281 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
282 GSource *ret = klass->io_create_watch(ioc, condition);
283
284 if (ioc->name) {
285 g_source_set_name(ret, ioc->name);
286 }
287
288 return ret;
289 }
290
291
292 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
293 AioContext *ctx,
294 IOHandler *io_read,
295 IOHandler *io_write,
296 void *opaque)
297 {
298 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
299
300 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
301 }
302
303 guint qio_channel_add_watch_full(QIOChannel *ioc,
304 GIOCondition condition,
305 QIOChannelFunc func,
306 gpointer user_data,
307 GDestroyNotify notify,
308 GMainContext *context)
309 {
310 GSource *source;
311 guint id;
312
313 source = qio_channel_create_watch(ioc, condition);
314
315 g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
316
317 id = g_source_attach(source, context);
318 g_source_unref(source);
319
320 return id;
321 }
322
323 guint qio_channel_add_watch(QIOChannel *ioc,
324 GIOCondition condition,
325 QIOChannelFunc func,
326 gpointer user_data,
327 GDestroyNotify notify)
328 {
329 return qio_channel_add_watch_full(ioc, condition, func,
330 user_data, notify, NULL);
331 }
332
333 GSource *qio_channel_add_watch_source(QIOChannel *ioc,
334 GIOCondition condition,
335 QIOChannelFunc func,
336 gpointer user_data,
337 GDestroyNotify notify,
338 GMainContext *context)
339 {
340 GSource *source;
341 guint id;
342
343 id = qio_channel_add_watch_full(ioc, condition, func,
344 user_data, notify, context);
345 source = g_main_context_find_source_by_id(context, id);
346 g_source_ref(source);
347 return source;
348 }
349
350
351 int qio_channel_shutdown(QIOChannel *ioc,
352 QIOChannelShutdown how,
353 Error **errp)
354 {
355 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
356
357 if (!klass->io_shutdown) {
358 error_setg(errp, "Data path shutdown not supported");
359 return -1;
360 }
361
362 return klass->io_shutdown(ioc, how, errp);
363 }
364
365
366 void qio_channel_set_delay(QIOChannel *ioc,
367 bool enabled)
368 {
369 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
370
371 if (klass->io_set_delay) {
372 klass->io_set_delay(ioc, enabled);
373 }
374 }
375
376
377 void qio_channel_set_cork(QIOChannel *ioc,
378 bool enabled)
379 {
380 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
381
382 if (klass->io_set_cork) {
383 klass->io_set_cork(ioc, enabled);
384 }
385 }
386
387
388 off_t qio_channel_io_seek(QIOChannel *ioc,
389 off_t offset,
390 int whence,
391 Error **errp)
392 {
393 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
394
395 if (!klass->io_seek) {
396 error_setg(errp, "Channel does not support random access");
397 return -1;
398 }
399
400 return klass->io_seek(ioc, offset, whence, errp);
401 }
402
403
404 static void qio_channel_restart_read(void *opaque)
405 {
406 QIOChannel *ioc = opaque;
407 Coroutine *co = ioc->read_coroutine;
408
409 /* Assert that aio_co_wake() reenters the coroutine directly */
410 assert(qemu_get_current_aio_context() ==
411 qemu_coroutine_get_aio_context(co));
412 aio_co_wake(co);
413 }
414
415 static void qio_channel_restart_write(void *opaque)
416 {
417 QIOChannel *ioc = opaque;
418 Coroutine *co = ioc->write_coroutine;
419
420 /* Assert that aio_co_wake() reenters the coroutine directly */
421 assert(qemu_get_current_aio_context() ==
422 qemu_coroutine_get_aio_context(co));
423 aio_co_wake(co);
424 }
425
426 static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
427 {
428 IOHandler *rd_handler = NULL, *wr_handler = NULL;
429 AioContext *ctx;
430
431 if (ioc->read_coroutine) {
432 rd_handler = qio_channel_restart_read;
433 }
434 if (ioc->write_coroutine) {
435 wr_handler = qio_channel_restart_write;
436 }
437
438 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
439 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
440 }
441
442 void qio_channel_attach_aio_context(QIOChannel *ioc,
443 AioContext *ctx)
444 {
445 assert(!ioc->read_coroutine);
446 assert(!ioc->write_coroutine);
447 ioc->ctx = ctx;
448 }
449
450 void qio_channel_detach_aio_context(QIOChannel *ioc)
451 {
452 ioc->read_coroutine = NULL;
453 ioc->write_coroutine = NULL;
454 qio_channel_set_aio_fd_handlers(ioc);
455 ioc->ctx = NULL;
456 }
457
458 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
459 GIOCondition condition)
460 {
461 assert(qemu_in_coroutine());
462 if (condition == G_IO_IN) {
463 assert(!ioc->read_coroutine);
464 ioc->read_coroutine = qemu_coroutine_self();
465 } else if (condition == G_IO_OUT) {
466 assert(!ioc->write_coroutine);
467 ioc->write_coroutine = qemu_coroutine_self();
468 } else {
469 abort();
470 }
471 qio_channel_set_aio_fd_handlers(ioc);
472 qemu_coroutine_yield();
473
474 /* Allow interrupting the operation by reentering the coroutine other than
475 * through the aio_fd_handlers. */
476 if (condition == G_IO_IN && ioc->read_coroutine) {
477 ioc->read_coroutine = NULL;
478 qio_channel_set_aio_fd_handlers(ioc);
479 } else if (condition == G_IO_OUT && ioc->write_coroutine) {
480 ioc->write_coroutine = NULL;
481 qio_channel_set_aio_fd_handlers(ioc);
482 }
483 }
484
485
486 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
487 GIOCondition condition,
488 gpointer opaque)
489 {
490 GMainLoop *loop = opaque;
491
492 g_main_loop_quit(loop);
493 return FALSE;
494 }
495
496
497 void qio_channel_wait(QIOChannel *ioc,
498 GIOCondition condition)
499 {
500 GMainContext *ctxt = g_main_context_new();
501 GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
502 GSource *source;
503
504 source = qio_channel_create_watch(ioc, condition);
505
506 g_source_set_callback(source,
507 (GSourceFunc)qio_channel_wait_complete,
508 loop,
509 NULL);
510
511 g_source_attach(source, ctxt);
512
513 g_main_loop_run(loop);
514
515 g_source_unref(source);
516 g_main_loop_unref(loop);
517 g_main_context_unref(ctxt);
518 }
519
520
521 static void qio_channel_finalize(Object *obj)
522 {
523 QIOChannel *ioc = QIO_CHANNEL(obj);
524
525 g_free(ioc->name);
526
527 #ifdef _WIN32
528 if (ioc->event) {
529 CloseHandle(ioc->event);
530 }
531 #endif
532 }
533
534 static const TypeInfo qio_channel_info = {
535 .parent = TYPE_OBJECT,
536 .name = TYPE_QIO_CHANNEL,
537 .instance_size = sizeof(QIOChannel),
538 .instance_finalize = qio_channel_finalize,
539 .abstract = true,
540 .class_size = sizeof(QIOChannelClass),
541 };
542
543
544 static void qio_channel_register_types(void)
545 {
546 type_register_static(&qio_channel_info);
547 }
548
549
550 type_init(qio_channel_register_types);