Commit ebdfc3c8 authored by Anthony Liguori's avatar Anthony Liguori
Browse files

Merge remote-tracking branch 'bonzini/nbd-for-anthony' into staging

* bonzini/nbd-for-anthony: (26 commits)
  nbd: add myself as maintainer
  qemu-nbd: throttle requests
  qemu-nbd: asynchronous operation
  qemu-nbd: add client pointer to NBDRequest
  qemu-nbd: move client handling to nbd.c
  qemu-nbd: use common main loop
  link the main loop and its dependencies into the tools
  qemu-nbd: introduce NBDRequest
  qemu-nbd: introduce NBDExport
  qemu-nbd: introduce nbd_do_receive_request
  qemu-nbd: more robust handling of invalid requests
  qemu-nbd: introduce nbd_do_send_reply
  qemu-nbd: simplify nbd_trip
  move corking functions to osdep.c
  qemu-nbd: remove data_size argument to nbd_trip
  qemu-nbd: remove offset argument to nbd_trip
  Update ioctl order in nbd_init() to detect EBUSY
  nbd: add support for NBD_CMD_TRIM
  nbd: add support for NBD_CMD_FLUSH
  nbd: add support for NBD_CMD_FLAG_FUA
  ...
parents a0fa8208 44f76b28
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -473,6 +473,13 @@ M: Mark McLoughlin <markmc@redhat.com>
S: Maintained
F: net/

Network Block Device (NBD)
M: Paolo Bonzini <pbonzini@redhat.com>
S: Odd Fixes
F: block/nbd.c
F: nbd.*
F: qemu-nbd.c

SLIRP
M: Jan Kiszka <jan.kiszka@siemens.com>
S: Maintained
+3 −2
Original line number Diff line number Diff line
@@ -147,8 +147,9 @@ endif
qemu-img.o: qemu-img-cmds.h
qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o qemu-ga.o: $(GENERATED_HEADERS)

tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \
	qemu-timer-common.o cutils.o
tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \
	qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o
tools-obj-$(CONFIG_POSIX) += compatfd.o

qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
+1 −1
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o

#######################################################################
# coroutines
coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o
coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
ifeq ($(CONFIG_UCONTEXT_COROUTINE),y)
coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o
else
+259 −60
Original line number Diff line number Diff line
@@ -46,14 +46,25 @@
#define logout(fmt, ...) ((void)0)
#endif

#define MAX_NBD_REQUESTS	16
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))

typedef struct BDRVNBDState {
    CoMutex lock;
    int sock;
    uint32_t nbdflags;
    off_t size;
    size_t blocksize;
    char *export_name; /* An NBD server may export several devices */

    CoMutex send_mutex;
    CoMutex free_sema;
    Coroutine *send_coroutine;
    int in_flight;

    Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
    struct nbd_reply reply;

    /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
     * it's a string of the form <hostname|ip4|\[ip6\]>:port
     */
@@ -106,6 +117,130 @@ out:
    return err;
}

static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
{
    int i;

    /* Poor man semaphore.  The free_sema is locked when no other request
     * can be accepted, and unlocked after receiving one reply.  */
    if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
        qemu_co_mutex_lock(&s->free_sema);
        assert(s->in_flight < MAX_NBD_REQUESTS);
    }
    s->in_flight++;

    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
        if (s->recv_coroutine[i] == NULL) {
            s->recv_coroutine[i] = qemu_coroutine_self();
            break;
        }
    }

    assert(i < MAX_NBD_REQUESTS);
    request->handle = INDEX_TO_HANDLE(s, i);
}

static int nbd_have_request(void *opaque)
{
    BDRVNBDState *s = opaque;

    return s->in_flight > 0;
}

static void nbd_reply_ready(void *opaque)
{
    BDRVNBDState *s = opaque;
    int i;

    if (s->reply.handle == 0) {
        /* No reply already in flight.  Fetch a header.  */
        if (nbd_receive_reply(s->sock, &s->reply) < 0) {
            s->reply.handle = 0;
            goto fail;
        }
    }

    /* There's no need for a mutex on the receive side, because the
     * handler acts as a synchronization point and ensures that only
     * one coroutine is called until the reply finishes.  */
    i = HANDLE_TO_INDEX(s, s->reply.handle);
    if (s->recv_coroutine[i]) {
        qemu_coroutine_enter(s->recv_coroutine[i], NULL);
        return;
    }

fail:
    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
        if (s->recv_coroutine[i]) {
            qemu_coroutine_enter(s->recv_coroutine[i], NULL);
        }
    }
}

static void nbd_restart_write(void *opaque)
{
    BDRVNBDState *s = opaque;
    qemu_coroutine_enter(s->send_coroutine, NULL);
}

static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
                               struct iovec *iov, int offset)
{
    int rc, ret;

    qemu_co_mutex_lock(&s->send_mutex);
    s->send_coroutine = qemu_coroutine_self();
    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
                            nbd_have_request, NULL, s);
    rc = nbd_send_request(s->sock, request);
    if (rc != -1 && iov) {
        ret = qemu_co_sendv(s->sock, iov, request->len, offset);
        if (ret != request->len) {
            errno = -EIO;
            rc = -1;
        }
    }
    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
                            nbd_have_request, NULL, s);
    s->send_coroutine = NULL;
    qemu_co_mutex_unlock(&s->send_mutex);
    return rc;
}

static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
                                 struct nbd_reply *reply,
                                 struct iovec *iov, int offset)
{
    int ret;

    /* Wait until we're woken up by the read handler.  TODO: perhaps
     * peek at the next reply and avoid yielding if it's ours?  */
    qemu_coroutine_yield();
    *reply = s->reply;
    if (reply->handle != request->handle) {
        reply->error = EIO;
    } else {
        if (iov && reply->error == 0) {
            ret = qemu_co_recvv(s->sock, iov, request->len, offset);
            if (ret != request->len) {
                reply->error = EIO;
            }
        }

        /* Tell the read handler to read another header.  */
        s->reply.handle = 0;
    }
}

static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
{
    int i = HANDLE_TO_INDEX(s, request->handle);
    s->recv_coroutine[i] = NULL;
    if (s->in_flight-- == MAX_NBD_REQUESTS) {
        qemu_co_mutex_unlock(&s->free_sema);
    }
}

static int nbd_establish_connection(BlockDriverState *bs)
{
    BDRVNBDState *s = bs->opaque;
@@ -135,8 +270,11 @@ static int nbd_establish_connection(BlockDriverState *bs)
        return -errno;
    }

    /* Now that we're connected, set the socket to be non-blocking */
    /* Now that we're connected, set the socket to be non-blocking and
     * kick the reply mechanism.  */
    socket_set_nonblock(sock);
    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
                            nbd_have_request, NULL, s);

    s->sock = sock;
    s->size = size;
@@ -152,11 +290,11 @@ static void nbd_teardown_connection(BlockDriverState *bs)
    struct nbd_request request;

    request.type = NBD_CMD_DISC;
    request.handle = (uint64_t)(intptr_t)bs;
    request.from = 0;
    request.len = 0;
    nbd_send_request(s->sock, &request);

    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
    closesocket(s->sock);
}

@@ -165,6 +303,9 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
    BDRVNBDState *s = bs->opaque;
    int result;

    qemu_co_mutex_init(&s->send_mutex);
    qemu_co_mutex_init(&s->free_sema);

    /* Pop the config into our state object. Exit if invalid. */
    result = nbd_config(s, filename, flags);
    if (result != 0) {
@@ -176,90 +317,146 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
     */
    result = nbd_establish_connection(bs);

    qemu_co_mutex_init(&s->lock);
    return result;
}

static int nbd_read(BlockDriverState *bs, int64_t sector_num,
                    uint8_t *buf, int nb_sectors)
static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num,
                          int nb_sectors, QEMUIOVector *qiov,
                          int offset)
{
    BDRVNBDState *s = bs->opaque;
    struct nbd_request request;
    struct nbd_reply reply;

    request.type = NBD_CMD_READ;
    request.handle = (uint64_t)(intptr_t)bs;
    request.from = sector_num * 512;
    request.len = nb_sectors * 512;

    if (nbd_send_request(s->sock, &request) == -1)
        return -errno;

    if (nbd_receive_reply(s->sock, &reply) == -1)
        return -errno;

    if (reply.error !=0)
    nbd_coroutine_start(s, &request);
    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
        reply.error = errno;
    } else {
        nbd_co_receive_reply(s, &request, &reply, qiov->iov, offset);
    }
    nbd_coroutine_end(s, &request);
    return -reply.error;

    if (reply.handle != request.handle)
        return -EIO;

    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
        return -EIO;

    return 0;
}

static int nbd_write(BlockDriverState *bs, int64_t sector_num,
                     const uint8_t *buf, int nb_sectors)
static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num,
                           int nb_sectors, QEMUIOVector *qiov,
                           int offset)
{
    BDRVNBDState *s = bs->opaque;
    struct nbd_request request;
    struct nbd_reply reply;

    request.type = NBD_CMD_WRITE;
    request.handle = (uint64_t)(intptr_t)bs;
    if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) {
        request.type |= NBD_CMD_FLAG_FUA;
    }

    request.from = sector_num * 512;
    request.len = nb_sectors * 512;

    if (nbd_send_request(s->sock, &request) == -1)
        return -errno;

    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
        return -EIO;

    if (nbd_receive_reply(s->sock, &reply) == -1)
        return -errno;

    if (reply.error !=0)
    nbd_coroutine_start(s, &request);
    if (nbd_co_send_request(s, &request, qiov->iov, offset) == -1) {
        reply.error = errno;
    } else {
        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
    }
    nbd_coroutine_end(s, &request);
    return -reply.error;
}

    if (reply.handle != request.handle)
        return -EIO;
/* qemu-nbd has a limit of slightly less than 1M per request.  Try to
 * remain aligned to 4K. */
#define NBD_MAX_SECTORS 2040

    return 0;
static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
                        int nb_sectors, QEMUIOVector *qiov)
{
    int offset = 0;
    int ret;
    while (nb_sectors > NBD_MAX_SECTORS) {
        ret = nbd_co_readv_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
        if (ret < 0) {
            return ret;
        }
        offset += NBD_MAX_SECTORS * 512;
        sector_num += NBD_MAX_SECTORS;
        nb_sectors -= NBD_MAX_SECTORS;
    }
    return nbd_co_readv_1(bs, sector_num, nb_sectors, qiov, offset);
}

static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num,
                                    uint8_t *buf, int nb_sectors)
static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
                         int nb_sectors, QEMUIOVector *qiov)
{
    int offset = 0;
    int ret;
    BDRVNBDState *s = bs->opaque;
    qemu_co_mutex_lock(&s->lock);
    ret = nbd_read(bs, sector_num, buf, nb_sectors);
    qemu_co_mutex_unlock(&s->lock);
    while (nb_sectors > NBD_MAX_SECTORS) {
        ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
        if (ret < 0) {
            return ret;
        }
        offset += NBD_MAX_SECTORS * 512;
        sector_num += NBD_MAX_SECTORS;
        nb_sectors -= NBD_MAX_SECTORS;
    }
    return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset);
}

static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num,
                                     const uint8_t *buf, int nb_sectors)
static int nbd_co_flush(BlockDriverState *bs)
{
    int ret;
    BDRVNBDState *s = bs->opaque;
    qemu_co_mutex_lock(&s->lock);
    ret = nbd_write(bs, sector_num, buf, nb_sectors);
    qemu_co_mutex_unlock(&s->lock);
    return ret;
    struct nbd_request request;
    struct nbd_reply reply;

    if (!(s->nbdflags & NBD_FLAG_SEND_FLUSH)) {
        return 0;
    }

    request.type = NBD_CMD_FLUSH;
    if (s->nbdflags & NBD_FLAG_SEND_FUA) {
        request.type |= NBD_CMD_FLAG_FUA;
    }

    request.from = 0;
    request.len = 0;

    nbd_coroutine_start(s, &request);
    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
        reply.error = errno;
    } else {
        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
    }
    nbd_coroutine_end(s, &request);
    return -reply.error;
}

static int nbd_co_discard(BlockDriverState *bs, int64_t sector_num,
                          int nb_sectors)
{
    BDRVNBDState *s = bs->opaque;
    struct nbd_request request;
    struct nbd_reply reply;

    if (!(s->nbdflags & NBD_FLAG_SEND_TRIM)) {
        return 0;
    }
    request.type = NBD_CMD_TRIM;
    request.from = sector_num * 512;;
    request.len = nb_sectors * 512;

    nbd_coroutine_start(s, &request);
    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
        reply.error = errno;
    } else {
        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
    }
    nbd_coroutine_end(s, &request);
    return -reply.error;
}

static void nbd_close(BlockDriverState *bs)
@@ -282,9 +479,11 @@ static BlockDriver bdrv_nbd = {
    .format_name         = "nbd",
    .instance_size       = sizeof(BDRVNBDState),
    .bdrv_file_open      = nbd_open,
    .bdrv_read          = nbd_co_read,
    .bdrv_write         = nbd_co_write,
    .bdrv_co_readv       = nbd_co_readv,
    .bdrv_co_writev      = nbd_co_writev,
    .bdrv_close          = nbd_close,
    .bdrv_co_flush_to_os = nbd_co_flush,
    .bdrv_co_discard     = nbd_co_discard,
    .bdrv_getlength      = nbd_getlength,
    .protocol_name       = "nbd",
};
+23 −227
Original line number Diff line number Diff line
@@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
    return acb;
}

#ifdef _WIN32

struct msghdr {
    struct iovec *msg_iov;
    size_t        msg_iovlen;
};

static ssize_t sendmsg(int s, const struct msghdr *msg, int flags)
{
    size_t size = 0;
    char *buf, *p;
    int i, ret;

    /* count the msg size */
    for (i = 0; i < msg->msg_iovlen; i++) {
        size += msg->msg_iov[i].iov_len;
    }
    buf = g_malloc(size);

    p = buf;
    for (i = 0; i < msg->msg_iovlen; i++) {
        memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
        p += msg->msg_iov[i].iov_len;
    }

    ret = send(s, buf, size, flags);

    g_free(buf);
    return ret;
}

static ssize_t recvmsg(int s, struct msghdr *msg, int flags)
{
    size_t size = 0;
    char *buf, *p;
    int i, ret;

    /* count the msg size */
    for (i = 0; i < msg->msg_iovlen; i++) {
        size += msg->msg_iov[i].iov_len;
    }
    buf = g_malloc(size);

    ret = qemu_recv(s, buf, size, flags);
    if (ret < 0) {
        goto out;
    }

    p = buf;
    for (i = 0; i < msg->msg_iovlen; i++) {
        memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len);
        p += msg->msg_iov[i].iov_len;
    }
out:
    g_free(buf);
    return ret;
}

#endif

/*
 * Send/recv data with iovec buffers
 *
 * This function send/recv data from/to the iovec buffer directly.
 * The first `offset' bytes in the iovec buffer are skipped and next
 * `len' bytes are used.
 *
 * For example,
 *
 *   do_send_recv(sockfd, iov, len, offset, 1);
 *
 * is equals to
 *
 *   char *buf = malloc(size);
 *   iov_to_buf(iov, iovcnt, buf, offset, size);
 *   send(sockfd, buf, size, 0);
 *   free(buf);
 */
static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
                        int write)
{
    struct msghdr msg;
    int ret, diff;

    memset(&msg, 0, sizeof(msg));
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;

    len += offset;

    while (iov->iov_len < len) {
        len -= iov->iov_len;

        iov++;
        msg.msg_iovlen++;
    }

    diff = iov->iov_len - len;
    iov->iov_len -= diff;

    while (msg.msg_iov->iov_len <= offset) {
        offset -= msg.msg_iov->iov_len;

        msg.msg_iov++;
        msg.msg_iovlen--;
    }

    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
    msg.msg_iov->iov_len -= offset;

    if (write) {
        ret = sendmsg(sockfd, &msg, 0);
    } else {
        ret = recvmsg(sockfd, &msg, 0);
    }

    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
    msg.msg_iov->iov_len += offset;

    iov->iov_len += diff;
    return ret;
}

static int connect_to_sdog(const char *addr, const char *port)
{
    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
@@ -618,83 +495,19 @@ success:
    return fd;
}

static int do_readv_writev(int sockfd, struct iovec *iov, int len,
                           int iov_offset, int write)
{
    int ret;
again:
    ret = do_send_recv(sockfd, iov, len, iov_offset, write);
    if (ret < 0) {
        if (errno == EINTR) {
            goto again;
        }
        if (errno == EAGAIN) {
            if (qemu_in_coroutine()) {
                qemu_coroutine_yield();
            }
            goto again;
        }
        error_report("failed to recv a rsp, %s", strerror(errno));
        return 1;
    }

    iov_offset += ret;
    len -= ret;
    if (len) {
        goto again;
    }

    return 0;
}

static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
{
    return do_readv_writev(sockfd, iov, len, iov_offset, 0);
}

static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
{
    return do_readv_writev(sockfd, iov, len, iov_offset, 1);
}

static int do_read_write(int sockfd, void *buf, int len, int write)
{
    struct iovec iov;

    iov.iov_base = buf;
    iov.iov_len = len;

    return do_readv_writev(sockfd, &iov, len, 0, write);
}

static int do_read(int sockfd, void *buf, int len)
{
    return do_read_write(sockfd, buf, len, 0);
}

static int do_write(int sockfd, void *buf, int len)
{
    return do_read_write(sockfd, buf, len, 1);
}

static int send_req(int sockfd, SheepdogReq *hdr, void *data,
                    unsigned int *wlen)
{
    int ret;
    struct iovec iov[2];

    iov[0].iov_base = hdr;
    iov[0].iov_len = sizeof(*hdr);

    if (*wlen) {
        iov[1].iov_base = data;
        iov[1].iov_len = *wlen;
    ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0);
    if (ret < sizeof(*hdr)) {
        error_report("failed to send a req, %s", strerror(errno));
    }

    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
    if (ret) {
    ret = qemu_send_full(sockfd, data, *wlen, 0);
    if (ret < *wlen) {
        error_report("failed to send a req, %s", strerror(errno));
        ret = -1;
    }

    return ret;
@@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
{
    int ret;

    socket_set_block(sockfd);
    ret = send_req(sockfd, hdr, data, wlen);
    if (ret) {
        ret = -1;
    if (ret < 0) {
        goto out;
    }

    ret = do_read(sockfd, hdr, sizeof(*hdr));
    if (ret) {
    ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0);
    if (ret < sizeof(*hdr)) {
        error_report("failed to get a rsp, %s", strerror(errno));
        ret = -1;
        goto out;
    }

@@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
    }

    if (*rlen) {
        ret = do_read(sockfd, data, *rlen);
        if (ret) {
        ret = qemu_recv_full(sockfd, data, *rlen, 0);
        if (ret < *rlen) {
            error_report("failed to get the data, %s", strerror(errno));
            ret = -1;
            goto out;
        }
    }
    ret = 0;
out:
    socket_set_nonblock(sockfd);
    return ret;
}

@@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque)
    }

    /* read a header */
    ret = do_read(fd, &rsp, sizeof(rsp));
    if (ret) {
    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
    if (ret < 0) {
        error_report("failed to get the header, %s", strerror(errno));
        goto out;
    }
@@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque)
        }
        break;
    case AIOCB_READ_UDATA:
        ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
        ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
                            aio_req->iov_offset);
        if (ret) {
        if (ret < 0) {
            error_report("failed to get the data, %s", strerror(errno));
            goto out;
        }
@@ -890,22 +702,6 @@ static int aio_flush_request(void *opaque)
    return !QLIST_EMPTY(&s->outstanding_aio_head);
}

#if !defined(SOL_TCP) || !defined(TCP_CORK)

static int set_cork(int fd, int v)
{
    return 0;
}

#else

static int set_cork(int fd, int v)
{
    return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v));
}

#endif

static int set_nodelay(int fd)
{
    int ret, opt;
@@ -1111,26 +907,26 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
    s->co_send = qemu_coroutine_self();
    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
                            aio_flush_request, NULL, s);
    set_cork(s->fd, 1);
    socket_set_cork(s->fd, 1);

    /* send a header */
    ret = do_write(s->fd, &hdr, sizeof(hdr));
    if (ret) {
    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
    if (ret < 0) {
        qemu_co_mutex_unlock(&s->lock);
        error_report("failed to send a req, %s", strerror(errno));
        return -EIO;
    }

    if (wlen) {
        ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
        if (ret) {
        ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
        if (ret < 0) {
            qemu_co_mutex_unlock(&s->lock);
            error_report("failed to send a data, %s", strerror(errno));
            return -EIO;
        }
    }

    set_cork(s->fd, 0);
    socket_set_cork(s->fd, 0);
    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
                            aio_flush_request, NULL, s);
    qemu_co_mutex_unlock(&s->lock);
Loading