Merge remote-tracking branch 'remotes/armbru/tags/pull-qapi-2017-03-16' into staging
[qemu.git] / block / rbd.c
1 /*
2 * QEMU Block driver for RADOS (Ceph)
3 *
4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5 * Josh Durgin <josh.durgin@dreamhost.com>
6 *
7 * This work is licensed under the terms of the GNU GPL, version 2. See
8 * the COPYING file in the top-level directory.
9 *
10 * Contributions after 2012-01-13 are licensed under the terms of the
11 * GNU GPL, version 2 or (at your option) any later version.
12 */
13
14 #include "qemu/osdep.h"
15
16 #include "qapi/error.h"
17 #include "qemu/error-report.h"
18 #include "block/block_int.h"
19 #include "crypto/secret.h"
20 #include "qemu/cutils.h"
21 #include "qapi/qmp/qstring.h"
22
23 #include <rbd/librbd.h>
24
25 /*
26 * When specifying the image filename use:
27 *
28 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
29 *
30 * poolname must be the name of an existing rados pool.
31 *
32 * devicename is the name of the rbd image.
33 *
34 * Each option given is used to configure rados, and may be any valid
35 * Ceph option, "id", or "conf".
36 *
37 * The "id" option indicates what user we should authenticate as to
38 * the Ceph cluster. If it is excluded we will use the Ceph default
39 * (normally 'admin').
40 *
41 * The "conf" option specifies a Ceph configuration file to read. If
42 * it is not specified, we will read from the default Ceph locations
43 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
44 * file, specify conf=/dev/null.
45 *
46 * Configuration values containing :, @, or = can be escaped with a
47 * leading "\".
48 */
49
50 /* rbd_aio_discard added in 0.1.2 */
51 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
52 #define LIBRBD_SUPPORTS_DISCARD
53 #else
54 #undef LIBRBD_SUPPORTS_DISCARD
55 #endif
56
57 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
58
59 #define RBD_MAX_CONF_NAME_SIZE 128
60 #define RBD_MAX_CONF_VAL_SIZE 512
61 #define RBD_MAX_CONF_SIZE 1024
62 #define RBD_MAX_POOL_NAME_SIZE 128
63 #define RBD_MAX_SNAP_NAME_SIZE 128
64 #define RBD_MAX_SNAPS 100
65
66 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
67 #ifdef LIBRBD_SUPPORTS_IOVEC
68 #define LIBRBD_USE_IOVEC 1
69 #else
70 #define LIBRBD_USE_IOVEC 0
71 #endif
72
73 typedef enum {
74 RBD_AIO_READ,
75 RBD_AIO_WRITE,
76 RBD_AIO_DISCARD,
77 RBD_AIO_FLUSH
78 } RBDAIOCmd;
79
80 typedef struct RBDAIOCB {
81 BlockAIOCB common;
82 int64_t ret;
83 QEMUIOVector *qiov;
84 char *bounce;
85 RBDAIOCmd cmd;
86 int error;
87 struct BDRVRBDState *s;
88 } RBDAIOCB;
89
90 typedef struct RADOSCB {
91 RBDAIOCB *acb;
92 struct BDRVRBDState *s;
93 int64_t size;
94 char *buf;
95 int64_t ret;
96 } RADOSCB;
97
98 typedef struct BDRVRBDState {
99 rados_t cluster;
100 rados_ioctx_t io_ctx;
101 rbd_image_t image;
102 char name[RBD_MAX_IMAGE_NAME_SIZE];
103 char *snap;
104 } BDRVRBDState;
105
106 static char *qemu_rbd_next_tok(int max_len,
107 char *src, char delim,
108 const char *name,
109 char **p, Error **errp)
110 {
111 int l;
112 char *end;
113
114 *p = NULL;
115
116 if (delim != '\0') {
117 for (end = src; *end; ++end) {
118 if (*end == delim) {
119 break;
120 }
121 if (*end == '\\' && end[1] != '\0') {
122 end++;
123 }
124 }
125 if (*end == delim) {
126 *p = end + 1;
127 *end = '\0';
128 }
129 }
130 l = strlen(src);
131 if (l >= max_len) {
132 error_setg(errp, "%s too long", name);
133 return NULL;
134 } else if (l == 0) {
135 error_setg(errp, "%s too short", name);
136 return NULL;
137 }
138
139 return src;
140 }
141
142 static void qemu_rbd_unescape(char *src)
143 {
144 char *p;
145
146 for (p = src; *src; ++src, ++p) {
147 if (*src == '\\' && src[1] != '\0') {
148 src++;
149 }
150 *p = *src;
151 }
152 *p = '\0';
153 }
154
155 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
156 Error **errp)
157 {
158 const char *start;
159 char *p, *buf, *keypairs;
160 char *found_str;
161 size_t max_keypair_size;
162 Error *local_err = NULL;
163
164 if (!strstart(filename, "rbd:", &start)) {
165 error_setg(errp, "File name must start with 'rbd:'");
166 return;
167 }
168
169 max_keypair_size = strlen(start) + 1;
170 buf = g_strdup(start);
171 keypairs = g_malloc0(max_keypair_size);
172 p = buf;
173
174 found_str = qemu_rbd_next_tok(RBD_MAX_POOL_NAME_SIZE, p,
175 '/', "pool name", &p, &local_err);
176 if (local_err) {
177 goto done;
178 }
179 if (!p) {
180 error_setg(errp, "Pool name is required");
181 goto done;
182 }
183 qemu_rbd_unescape(found_str);
184 qdict_put(options, "pool", qstring_from_str(found_str));
185
186 if (strchr(p, '@')) {
187 found_str = qemu_rbd_next_tok(RBD_MAX_IMAGE_NAME_SIZE, p,
188 '@', "object name", &p, &local_err);
189 if (local_err) {
190 goto done;
191 }
192 qemu_rbd_unescape(found_str);
193 qdict_put(options, "image", qstring_from_str(found_str));
194
195 found_str = qemu_rbd_next_tok(RBD_MAX_SNAP_NAME_SIZE, p,
196 ':', "snap name", &p, &local_err);
197 if (local_err) {
198 goto done;
199 }
200 qemu_rbd_unescape(found_str);
201 qdict_put(options, "snapshot", qstring_from_str(found_str));
202 } else {
203 found_str = qemu_rbd_next_tok(RBD_MAX_IMAGE_NAME_SIZE, p,
204 ':', "object name", &p, &local_err);
205 if (local_err) {
206 goto done;
207 }
208 qemu_rbd_unescape(found_str);
209 qdict_put(options, "image", qstring_from_str(found_str));
210 }
211 if (!p) {
212 goto done;
213 }
214
215 found_str = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
216 '\0', "configuration", &p, &local_err);
217 if (local_err) {
218 goto done;
219 }
220
221 p = found_str;
222
223 /* The following are essentially all key/value pairs, and we treat
224 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */
225 while (p) {
226 char *name, *value;
227 name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
228 '=', "conf option name", &p, &local_err);
229 if (local_err) {
230 break;
231 }
232
233 if (!p) {
234 error_setg(errp, "conf option %s has no value", name);
235 break;
236 }
237
238 qemu_rbd_unescape(name);
239
240 value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p,
241 ':', "conf option value", &p, &local_err);
242 if (local_err) {
243 break;
244 }
245 qemu_rbd_unescape(value);
246
247 if (!strcmp(name, "conf")) {
248 qdict_put(options, "conf", qstring_from_str(value));
249 } else if (!strcmp(name, "id")) {
250 qdict_put(options, "user" , qstring_from_str(value));
251 } else {
252 /* FIXME: This is pretty ugly, and not the right way to do this.
253 * These should be contained in a structure, and then
254 * passed explicitly as individual key/value pairs to
255 * rados. Consider this legacy code that needs to be
256 * updated. */
257 char *tmp = g_malloc0(max_keypair_size);
258 /* only use a delimiter if it is not the first keypair found */
259 /* These are sets of unknown key/value pairs we'll pass along
260 * to ceph */
261 if (keypairs[0]) {
262 snprintf(tmp, max_keypair_size, ":%s=%s", name, value);
263 pstrcat(keypairs, max_keypair_size, tmp);
264 } else {
265 snprintf(keypairs, max_keypair_size, "%s=%s", name, value);
266 }
267 g_free(tmp);
268 }
269 }
270
271 if (keypairs[0]) {
272 qdict_put(options, "keyvalue-pairs", qstring_from_str(keypairs));
273 }
274
275
276 done:
277 if (local_err) {
278 error_propagate(errp, local_err);
279 }
280 g_free(buf);
281 g_free(keypairs);
282 return;
283 }
284
285
286 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid,
287 Error **errp)
288 {
289 if (secretid == 0) {
290 return 0;
291 }
292
293 gchar *secret = qcrypto_secret_lookup_as_base64(secretid,
294 errp);
295 if (!secret) {
296 return -1;
297 }
298
299 rados_conf_set(cluster, "key", secret);
300 g_free(secret);
301
302 return 0;
303 }
304
305 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs,
306 Error **errp)
307 {
308 char *p, *buf;
309 char *name;
310 char *value;
311 Error *local_err = NULL;
312 int ret = 0;
313
314 buf = g_strdup(keypairs);
315 p = buf;
316
317 while (p) {
318 name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
319 '=', "conf option name", &p, &local_err);
320 if (local_err) {
321 break;
322 }
323
324 if (!p) {
325 error_setg(errp, "conf option %s has no value", name);
326 ret = -EINVAL;
327 break;
328 }
329
330 value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p,
331 ':', "conf option value", &p, &local_err);
332 if (local_err) {
333 break;
334 }
335
336 ret = rados_conf_set(cluster, name, value);
337 if (ret < 0) {
338 error_setg_errno(errp, -ret, "invalid conf option %s", name);
339 ret = -EINVAL;
340 break;
341 }
342 }
343
344 if (local_err) {
345 error_propagate(errp, local_err);
346 ret = -EINVAL;
347 }
348 g_free(buf);
349 return ret;
350 }
351
352 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
353 {
354 if (LIBRBD_USE_IOVEC) {
355 RBDAIOCB *acb = rcb->acb;
356 iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
357 acb->qiov->size - offs);
358 } else {
359 memset(rcb->buf + offs, 0, rcb->size - offs);
360 }
361 }
362
363 static QemuOptsList runtime_opts = {
364 .name = "rbd",
365 .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
366 .desc = {
367 {
368 .name = "filename",
369 .type = QEMU_OPT_STRING,
370 .help = "Specification of the rbd image",
371 },
372 {
373 .name = "password-secret",
374 .type = QEMU_OPT_STRING,
375 .help = "ID of secret providing the password",
376 },
377 {
378 .name = "conf",
379 .type = QEMU_OPT_STRING,
380 .help = "Rados config file location",
381 },
382 {
383 .name = "pool",
384 .type = QEMU_OPT_STRING,
385 .help = "Rados pool name",
386 },
387 {
388 .name = "image",
389 .type = QEMU_OPT_STRING,
390 .help = "Image name in the pool",
391 },
392 {
393 .name = "snapshot",
394 .type = QEMU_OPT_STRING,
395 .help = "Ceph snapshot name",
396 },
397 {
398 /* maps to 'id' in rados_create() */
399 .name = "user",
400 .type = QEMU_OPT_STRING,
401 .help = "Rados id name",
402 },
403 {
404 .name = "keyvalue-pairs",
405 .type = QEMU_OPT_STRING,
406 .help = "Legacy rados key/value option parameters",
407 },
408 {
409 .name = "host",
410 .type = QEMU_OPT_STRING,
411 },
412 {
413 .name = "port",
414 .type = QEMU_OPT_STRING,
415 },
416 {
417 .name = "auth",
418 .type = QEMU_OPT_STRING,
419 .help = "Supported authentication method, either cephx or none",
420 },
421 { /* end of list */ }
422 },
423 };
424
425 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
426 {
427 Error *local_err = NULL;
428 int64_t bytes = 0;
429 int64_t objsize;
430 int obj_order = 0;
431 const char *pool, *name, *conf, *clientname, *keypairs;
432 const char *secretid;
433 rados_t cluster;
434 rados_ioctx_t io_ctx;
435 QDict *options = NULL;
436 QemuOpts *rbd_opts = NULL;
437 int ret = 0;
438
439 secretid = qemu_opt_get(opts, "password-secret");
440
441 /* Read out options */
442 bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
443 BDRV_SECTOR_SIZE);
444 objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
445 if (objsize) {
446 if ((objsize - 1) & objsize) { /* not a power of 2? */
447 error_setg(errp, "obj size needs to be power of 2");
448 ret = -EINVAL;
449 goto exit;
450 }
451 if (objsize < 4096) {
452 error_setg(errp, "obj size too small");
453 ret = -EINVAL;
454 goto exit;
455 }
456 obj_order = ctz32(objsize);
457 }
458
459 options = qdict_new();
460 qemu_rbd_parse_filename(filename, options, &local_err);
461 if (local_err) {
462 ret = -EINVAL;
463 error_propagate(errp, local_err);
464 goto exit;
465 }
466
467 rbd_opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
468 qemu_opts_absorb_qdict(rbd_opts, options, &local_err);
469 if (local_err) {
470 error_propagate(errp, local_err);
471 ret = -EINVAL;
472 goto exit;
473 }
474
475 pool = qemu_opt_get(rbd_opts, "pool");
476 conf = qemu_opt_get(rbd_opts, "conf");
477 clientname = qemu_opt_get(rbd_opts, "user");
478 name = qemu_opt_get(rbd_opts, "image");
479 keypairs = qemu_opt_get(rbd_opts, "keyvalue-pairs");
480
481 ret = rados_create(&cluster, clientname);
482 if (ret < 0) {
483 error_setg_errno(errp, -ret, "error initializing");
484 goto exit;
485 }
486
487 /* try default location when conf=NULL, but ignore failure */
488 ret = rados_conf_read_file(cluster, conf);
489 if (conf && ret < 0) {
490 error_setg_errno(errp, -ret, "error reading conf file %s", conf);
491 ret = -EIO;
492 goto shutdown;
493 }
494
495 ret = qemu_rbd_set_keypairs(cluster, keypairs, errp);
496 if (ret < 0) {
497 ret = -EIO;
498 goto shutdown;
499 }
500
501 if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
502 ret = -EIO;
503 goto shutdown;
504 }
505
506 ret = rados_connect(cluster);
507 if (ret < 0) {
508 error_setg_errno(errp, -ret, "error connecting");
509 goto shutdown;
510 }
511
512 ret = rados_ioctx_create(cluster, pool, &io_ctx);
513 if (ret < 0) {
514 error_setg_errno(errp, -ret, "error opening pool %s", pool);
515 goto shutdown;
516 }
517
518 ret = rbd_create(io_ctx, name, bytes, &obj_order);
519 if (ret < 0) {
520 error_setg_errno(errp, -ret, "error rbd create");
521 }
522
523 rados_ioctx_destroy(io_ctx);
524
525 shutdown:
526 rados_shutdown(cluster);
527
528 exit:
529 QDECREF(options);
530 qemu_opts_del(rbd_opts);
531 return ret;
532 }
533
534 /*
535 * This aio completion is being called from rbd_finish_bh() and runs in qemu
536 * BH context.
537 */
538 static void qemu_rbd_complete_aio(RADOSCB *rcb)
539 {
540 RBDAIOCB *acb = rcb->acb;
541 int64_t r;
542
543 r = rcb->ret;
544
545 if (acb->cmd != RBD_AIO_READ) {
546 if (r < 0) {
547 acb->ret = r;
548 acb->error = 1;
549 } else if (!acb->error) {
550 acb->ret = rcb->size;
551 }
552 } else {
553 if (r < 0) {
554 qemu_rbd_memset(rcb, 0);
555 acb->ret = r;
556 acb->error = 1;
557 } else if (r < rcb->size) {
558 qemu_rbd_memset(rcb, r);
559 if (!acb->error) {
560 acb->ret = rcb->size;
561 }
562 } else if (!acb->error) {
563 acb->ret = r;
564 }
565 }
566
567 g_free(rcb);
568
569 if (!LIBRBD_USE_IOVEC) {
570 if (acb->cmd == RBD_AIO_READ) {
571 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
572 }
573 qemu_vfree(acb->bounce);
574 }
575
576 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
577
578 qemu_aio_unref(acb);
579 }
580
581 #define RBD_MON_HOST 0
582 #define RBD_AUTH_SUPPORTED 1
583
584 static char *qemu_rbd_array_opts(QDict *options, const char *prefix, int type,
585 Error **errp)
586 {
587 int num_entries;
588 QemuOpts *opts = NULL;
589 QDict *sub_options;
590 const char *host;
591 const char *port;
592 char *str;
593 char *rados_str = NULL;
594 Error *local_err = NULL;
595 int i;
596
597 assert(type == RBD_MON_HOST || type == RBD_AUTH_SUPPORTED);
598
599 num_entries = qdict_array_entries(options, prefix);
600
601 if (num_entries < 0) {
602 error_setg(errp, "Parse error on RBD QDict array");
603 return NULL;
604 }
605
606 for (i = 0; i < num_entries; i++) {
607 char *strbuf = NULL;
608 const char *value;
609 char *rados_str_tmp;
610
611 str = g_strdup_printf("%s%d.", prefix, i);
612 qdict_extract_subqdict(options, &sub_options, str);
613 g_free(str);
614
615 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
616 qemu_opts_absorb_qdict(opts, sub_options, &local_err);
617 QDECREF(sub_options);
618 if (local_err) {
619 error_propagate(errp, local_err);
620 g_free(rados_str);
621 rados_str = NULL;
622 goto exit;
623 }
624
625 if (type == RBD_MON_HOST) {
626 host = qemu_opt_get(opts, "host");
627 port = qemu_opt_get(opts, "port");
628
629 value = host;
630 if (port) {
631 /* check for ipv6 */
632 if (strchr(host, ':')) {
633 strbuf = g_strdup_printf("[%s]:%s", host, port);
634 } else {
635 strbuf = g_strdup_printf("%s:%s", host, port);
636 }
637 value = strbuf;
638 } else if (strchr(host, ':')) {
639 strbuf = g_strdup_printf("[%s]", host);
640 value = strbuf;
641 }
642 } else {
643 value = qemu_opt_get(opts, "auth");
644 }
645
646
647 /* each iteration in the for loop will build upon the string, and if
648 * rados_str is NULL then it is our first pass */
649 if (rados_str) {
650 /* separate options with ';', as that is what rados_conf_set()
651 * requires */
652 rados_str_tmp = rados_str;
653 rados_str = g_strdup_printf("%s;%s", rados_str_tmp, value);
654 g_free(rados_str_tmp);
655 } else {
656 rados_str = g_strdup(value);
657 }
658
659 g_free(strbuf);
660 qemu_opts_del(opts);
661 opts = NULL;
662 }
663
664 exit:
665 qemu_opts_del(opts);
666 return rados_str;
667 }
668
669 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
670 Error **errp)
671 {
672 BDRVRBDState *s = bs->opaque;
673 const char *pool, *snap, *conf, *clientname, *name, *keypairs;
674 const char *secretid;
675 QemuOpts *opts;
676 Error *local_err = NULL;
677 char *mon_host = NULL;
678 char *auth_supported = NULL;
679 int r;
680
681 opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
682 qemu_opts_absorb_qdict(opts, options, &local_err);
683 if (local_err) {
684 error_propagate(errp, local_err);
685 qemu_opts_del(opts);
686 return -EINVAL;
687 }
688
689 auth_supported = qemu_rbd_array_opts(options, "auth-supported.",
690 RBD_AUTH_SUPPORTED, &local_err);
691 if (local_err) {
692 error_propagate(errp, local_err);
693 r = -EINVAL;
694 goto failed_opts;
695 }
696
697 mon_host = qemu_rbd_array_opts(options, "server.",
698 RBD_MON_HOST, &local_err);
699 if (local_err) {
700 error_propagate(errp, local_err);
701 r = -EINVAL;
702 goto failed_opts;
703 }
704
705 secretid = qemu_opt_get(opts, "password-secret");
706
707 pool = qemu_opt_get(opts, "pool");
708 conf = qemu_opt_get(opts, "conf");
709 snap = qemu_opt_get(opts, "snapshot");
710 clientname = qemu_opt_get(opts, "user");
711 name = qemu_opt_get(opts, "image");
712 keypairs = qemu_opt_get(opts, "keyvalue-pairs");
713
714 r = rados_create(&s->cluster, clientname);
715 if (r < 0) {
716 error_setg_errno(errp, -r, "error initializing");
717 goto failed_opts;
718 }
719
720 s->snap = g_strdup(snap);
721 if (name) {
722 pstrcpy(s->name, RBD_MAX_IMAGE_NAME_SIZE, name);
723 }
724
725 /* try default location when conf=NULL, but ignore failure */
726 r = rados_conf_read_file(s->cluster, conf);
727 if (conf && r < 0) {
728 error_setg_errno(errp, -r, "error reading conf file %s", conf);
729 goto failed_shutdown;
730 }
731
732 r = qemu_rbd_set_keypairs(s->cluster, keypairs, errp);
733 if (r < 0) {
734 goto failed_shutdown;
735 }
736
737 if (mon_host) {
738 r = rados_conf_set(s->cluster, "mon_host", mon_host);
739 if (r < 0) {
740 goto failed_shutdown;
741 }
742 }
743
744 if (auth_supported) {
745 r = rados_conf_set(s->cluster, "auth_supported", auth_supported);
746 if (r < 0) {
747 goto failed_shutdown;
748 }
749 }
750
751 if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) {
752 r = -EIO;
753 goto failed_shutdown;
754 }
755
756 /*
757 * Fallback to more conservative semantics if setting cache
758 * options fails. Ignore errors from setting rbd_cache because the
759 * only possible error is that the option does not exist, and
760 * librbd defaults to no caching. If write through caching cannot
761 * be set up, fall back to no caching.
762 */
763 if (flags & BDRV_O_NOCACHE) {
764 rados_conf_set(s->cluster, "rbd_cache", "false");
765 } else {
766 rados_conf_set(s->cluster, "rbd_cache", "true");
767 }
768
769 r = rados_connect(s->cluster);
770 if (r < 0) {
771 error_setg_errno(errp, -r, "error connecting");
772 goto failed_shutdown;
773 }
774
775 r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
776 if (r < 0) {
777 error_setg_errno(errp, -r, "error opening pool %s", pool);
778 goto failed_shutdown;
779 }
780
781 r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
782 if (r < 0) {
783 error_setg_errno(errp, -r, "error reading header from %s", s->name);
784 goto failed_open;
785 }
786
787 bs->read_only = (s->snap != NULL);
788
789 qemu_opts_del(opts);
790 return 0;
791
792 failed_open:
793 rados_ioctx_destroy(s->io_ctx);
794 failed_shutdown:
795 rados_shutdown(s->cluster);
796 g_free(s->snap);
797 failed_opts:
798 qemu_opts_del(opts);
799 g_free(mon_host);
800 g_free(auth_supported);
801 return r;
802 }
803
804 static void qemu_rbd_close(BlockDriverState *bs)
805 {
806 BDRVRBDState *s = bs->opaque;
807
808 rbd_close(s->image);
809 rados_ioctx_destroy(s->io_ctx);
810 g_free(s->snap);
811 rados_shutdown(s->cluster);
812 }
813
814 static const AIOCBInfo rbd_aiocb_info = {
815 .aiocb_size = sizeof(RBDAIOCB),
816 };
817
818 static void rbd_finish_bh(void *opaque)
819 {
820 RADOSCB *rcb = opaque;
821 qemu_rbd_complete_aio(rcb);
822 }
823
824 /*
825 * This is the callback function for rbd_aio_read and _write
826 *
827 * Note: this function is being called from a non qemu thread so
828 * we need to be careful about what we do here. Generally we only
829 * schedule a BH, and do the rest of the io completion handling
830 * from rbd_finish_bh() which runs in a qemu context.
831 */
832 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
833 {
834 RBDAIOCB *acb = rcb->acb;
835
836 rcb->ret = rbd_aio_get_return_value(c);
837 rbd_aio_release(c);
838
839 aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
840 rbd_finish_bh, rcb);
841 }
842
843 static int rbd_aio_discard_wrapper(rbd_image_t image,
844 uint64_t off,
845 uint64_t len,
846 rbd_completion_t comp)
847 {
848 #ifdef LIBRBD_SUPPORTS_DISCARD
849 return rbd_aio_discard(image, off, len, comp);
850 #else
851 return -ENOTSUP;
852 #endif
853 }
854
855 static int rbd_aio_flush_wrapper(rbd_image_t image,
856 rbd_completion_t comp)
857 {
858 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
859 return rbd_aio_flush(image, comp);
860 #else
861 return -ENOTSUP;
862 #endif
863 }
864
865 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
866 int64_t off,
867 QEMUIOVector *qiov,
868 int64_t size,
869 BlockCompletionFunc *cb,
870 void *opaque,
871 RBDAIOCmd cmd)
872 {
873 RBDAIOCB *acb;
874 RADOSCB *rcb = NULL;
875 rbd_completion_t c;
876 int r;
877
878 BDRVRBDState *s = bs->opaque;
879
880 acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
881 acb->cmd = cmd;
882 acb->qiov = qiov;
883 assert(!qiov || qiov->size == size);
884
885 rcb = g_new(RADOSCB, 1);
886
887 if (!LIBRBD_USE_IOVEC) {
888 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
889 acb->bounce = NULL;
890 } else {
891 acb->bounce = qemu_try_blockalign(bs, qiov->size);
892 if (acb->bounce == NULL) {
893 goto failed;
894 }
895 }
896 if (cmd == RBD_AIO_WRITE) {
897 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
898 }
899 rcb->buf = acb->bounce;
900 }
901
902 acb->ret = 0;
903 acb->error = 0;
904 acb->s = s;
905
906 rcb->acb = acb;
907 rcb->s = acb->s;
908 rcb->size = size;
909 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
910 if (r < 0) {
911 goto failed;
912 }
913
914 switch (cmd) {
915 case RBD_AIO_WRITE:
916 #ifdef LIBRBD_SUPPORTS_IOVEC
917 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
918 #else
919 r = rbd_aio_write(s->image, off, size, rcb->buf, c);
920 #endif
921 break;
922 case RBD_AIO_READ:
923 #ifdef LIBRBD_SUPPORTS_IOVEC
924 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
925 #else
926 r = rbd_aio_read(s->image, off, size, rcb->buf, c);
927 #endif
928 break;
929 case RBD_AIO_DISCARD:
930 r = rbd_aio_discard_wrapper(s->image, off, size, c);
931 break;
932 case RBD_AIO_FLUSH:
933 r = rbd_aio_flush_wrapper(s->image, c);
934 break;
935 default:
936 r = -EINVAL;
937 }
938
939 if (r < 0) {
940 goto failed_completion;
941 }
942 return &acb->common;
943
944 failed_completion:
945 rbd_aio_release(c);
946 failed:
947 g_free(rcb);
948 if (!LIBRBD_USE_IOVEC) {
949 qemu_vfree(acb->bounce);
950 }
951
952 qemu_aio_unref(acb);
953 return NULL;
954 }
955
956 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
957 int64_t sector_num,
958 QEMUIOVector *qiov,
959 int nb_sectors,
960 BlockCompletionFunc *cb,
961 void *opaque)
962 {
963 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
964 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
965 RBD_AIO_READ);
966 }
967
968 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
969 int64_t sector_num,
970 QEMUIOVector *qiov,
971 int nb_sectors,
972 BlockCompletionFunc *cb,
973 void *opaque)
974 {
975 return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
976 (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
977 RBD_AIO_WRITE);
978 }
979
980 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
981 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
982 BlockCompletionFunc *cb,
983 void *opaque)
984 {
985 return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
986 }
987
988 #else
989
990 static int qemu_rbd_co_flush(BlockDriverState *bs)
991 {
992 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
993 /* rbd_flush added in 0.1.1 */
994 BDRVRBDState *s = bs->opaque;
995 return rbd_flush(s->image);
996 #else
997 return 0;
998 #endif
999 }
1000 #endif
1001
1002 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
1003 {
1004 BDRVRBDState *s = bs->opaque;
1005 rbd_image_info_t info;
1006 int r;
1007
1008 r = rbd_stat(s->image, &info, sizeof(info));
1009 if (r < 0) {
1010 return r;
1011 }
1012
1013 bdi->cluster_size = info.obj_size;
1014 return 0;
1015 }
1016
1017 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
1018 {
1019 BDRVRBDState *s = bs->opaque;
1020 rbd_image_info_t info;
1021 int r;
1022
1023 r = rbd_stat(s->image, &info, sizeof(info));
1024 if (r < 0) {
1025 return r;
1026 }
1027
1028 return info.size;
1029 }
1030
1031 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
1032 {
1033 BDRVRBDState *s = bs->opaque;
1034 int r;
1035
1036 r = rbd_resize(s->image, offset);
1037 if (r < 0) {
1038 return r;
1039 }
1040
1041 return 0;
1042 }
1043
1044 static int qemu_rbd_snap_create(BlockDriverState *bs,
1045 QEMUSnapshotInfo *sn_info)
1046 {
1047 BDRVRBDState *s = bs->opaque;
1048 int r;
1049
1050 if (sn_info->name[0] == '\0') {
1051 return -EINVAL; /* we need a name for rbd snapshots */
1052 }
1053
1054 /*
1055 * rbd snapshots are using the name as the user controlled unique identifier
1056 * we can't use the rbd snapid for that purpose, as it can't be set
1057 */
1058 if (sn_info->id_str[0] != '\0' &&
1059 strcmp(sn_info->id_str, sn_info->name) != 0) {
1060 return -EINVAL;
1061 }
1062
1063 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1064 return -ERANGE;
1065 }
1066
1067 r = rbd_snap_create(s->image, sn_info->name);
1068 if (r < 0) {
1069 error_report("failed to create snap: %s", strerror(-r));
1070 return r;
1071 }
1072
1073 return 0;
1074 }
1075
1076 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1077 const char *snapshot_id,
1078 const char *snapshot_name,
1079 Error **errp)
1080 {
1081 BDRVRBDState *s = bs->opaque;
1082 int r;
1083
1084 if (!snapshot_name) {
1085 error_setg(errp, "rbd need a valid snapshot name");
1086 return -EINVAL;
1087 }
1088
1089 /* If snapshot_id is specified, it must be equal to name, see
1090 qemu_rbd_snap_list() */
1091 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1092 error_setg(errp,
1093 "rbd do not support snapshot id, it should be NULL or "
1094 "equal to snapshot name");
1095 return -EINVAL;
1096 }
1097
1098 r = rbd_snap_remove(s->image, snapshot_name);
1099 if (r < 0) {
1100 error_setg_errno(errp, -r, "Failed to remove the snapshot");
1101 }
1102 return r;
1103 }
1104
1105 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1106 const char *snapshot_name)
1107 {
1108 BDRVRBDState *s = bs->opaque;
1109
1110 return rbd_snap_rollback(s->image, snapshot_name);
1111 }
1112
1113 static int qemu_rbd_snap_list(BlockDriverState *bs,
1114 QEMUSnapshotInfo **psn_tab)
1115 {
1116 BDRVRBDState *s = bs->opaque;
1117 QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1118 int i, snap_count;
1119 rbd_snap_info_t *snaps;
1120 int max_snaps = RBD_MAX_SNAPS;
1121
1122 do {
1123 snaps = g_new(rbd_snap_info_t, max_snaps);
1124 snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1125 if (snap_count <= 0) {
1126 g_free(snaps);
1127 }
1128 } while (snap_count == -ERANGE);
1129
1130 if (snap_count <= 0) {
1131 goto done;
1132 }
1133
1134 sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1135
1136 for (i = 0; i < snap_count; i++) {
1137 const char *snap_name = snaps[i].name;
1138
1139 sn_info = sn_tab + i;
1140 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1141 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1142
1143 sn_info->vm_state_size = snaps[i].size;
1144 sn_info->date_sec = 0;
1145 sn_info->date_nsec = 0;
1146 sn_info->vm_clock_nsec = 0;
1147 }
1148 rbd_snap_list_end(snaps);
1149 g_free(snaps);
1150
1151 done:
1152 *psn_tab = sn_tab;
1153 return snap_count;
1154 }
1155
1156 #ifdef LIBRBD_SUPPORTS_DISCARD
1157 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1158 int64_t offset,
1159 int count,
1160 BlockCompletionFunc *cb,
1161 void *opaque)
1162 {
1163 return rbd_start_aio(bs, offset, NULL, count, cb, opaque,
1164 RBD_AIO_DISCARD);
1165 }
1166 #endif
1167
1168 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1169 static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
1170 Error **errp)
1171 {
1172 BDRVRBDState *s = bs->opaque;
1173 int r = rbd_invalidate_cache(s->image);
1174 if (r < 0) {
1175 error_setg_errno(errp, -r, "Failed to invalidate the cache");
1176 }
1177 }
1178 #endif
1179
1180 static QemuOptsList qemu_rbd_create_opts = {
1181 .name = "rbd-create-opts",
1182 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1183 .desc = {
1184 {
1185 .name = BLOCK_OPT_SIZE,
1186 .type = QEMU_OPT_SIZE,
1187 .help = "Virtual disk size"
1188 },
1189 {
1190 .name = BLOCK_OPT_CLUSTER_SIZE,
1191 .type = QEMU_OPT_SIZE,
1192 .help = "RBD object size"
1193 },
1194 {
1195 .name = "password-secret",
1196 .type = QEMU_OPT_STRING,
1197 .help = "ID of secret providing the password",
1198 },
1199 { /* end of list */ }
1200 }
1201 };
1202
1203 static BlockDriver bdrv_rbd = {
1204 .format_name = "rbd",
1205 .instance_size = sizeof(BDRVRBDState),
1206 .bdrv_parse_filename = qemu_rbd_parse_filename,
1207 .bdrv_file_open = qemu_rbd_open,
1208 .bdrv_close = qemu_rbd_close,
1209 .bdrv_create = qemu_rbd_create,
1210 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1211 .bdrv_get_info = qemu_rbd_getinfo,
1212 .create_opts = &qemu_rbd_create_opts,
1213 .bdrv_getlength = qemu_rbd_getlength,
1214 .bdrv_truncate = qemu_rbd_truncate,
1215 .protocol_name = "rbd",
1216
1217 .bdrv_aio_readv = qemu_rbd_aio_readv,
1218 .bdrv_aio_writev = qemu_rbd_aio_writev,
1219
1220 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1221 .bdrv_aio_flush = qemu_rbd_aio_flush,
1222 #else
1223 .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
1224 #endif
1225
1226 #ifdef LIBRBD_SUPPORTS_DISCARD
1227 .bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard,
1228 #endif
1229
1230 .bdrv_snapshot_create = qemu_rbd_snap_create,
1231 .bdrv_snapshot_delete = qemu_rbd_snap_remove,
1232 .bdrv_snapshot_list = qemu_rbd_snap_list,
1233 .bdrv_snapshot_goto = qemu_rbd_snap_rollback,
1234 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1235 .bdrv_invalidate_cache = qemu_rbd_invalidate_cache,
1236 #endif
1237 };
1238
1239 static void bdrv_rbd_init(void)
1240 {
1241 bdrv_register(&bdrv_rbd);
1242 }
1243
1244 block_init(bdrv_rbd_init);