Commit 47783072 authored by Liu Yuan's avatar Liu Yuan Committed by Stefan Hajnoczi
Browse files

sheepdog: multiplex the rw FD to flush cache



This will reduce sockfds connected to the sheep server to one, which simply the
future hacks.

Cc: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Cc: Kevin Wolf <kwolf@redhat.com>
Cc: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: default avatarLiu Yuan <tailai.ly@taobao.com>
Signed-off-by: default avatarStefan Hajnoczi <stefanha@redhat.com>
parent df702c9b
Loading
Loading
Loading
Loading
+37 −45
Original line number Diff line number Diff line
@@ -266,6 +266,7 @@ typedef struct AIOReq {
enum AIOCBState {
    AIOCB_WRITE_UDATA,
    AIOCB_READ_UDATA,
    AIOCB_FLUSH_CACHE,
};

struct SheepdogAIOCB {
@@ -299,7 +300,6 @@ typedef struct BDRVSheepdogState {
    char *addr;
    char *port;
    int fd;
    int flush_fd;

    CoMutex lock;
    Coroutine *co_send;
@@ -736,6 +736,13 @@ static void coroutine_fn aio_read_response(void *opaque)
            goto out;
        }
        break;
    case AIOCB_FLUSH_CACHE:
        if (rsp.result == SD_RES_INVALID_PARMS) {
            dprintf("disable cache since the server doesn't support it\n");
            s->cache_flags = SD_FLAG_CMD_DIRECT;
            rsp.result = SD_RES_SUCCESS;
        }
        break;
    }

    if (rsp.result != SD_RES_SUCCESS) {
@@ -950,7 +957,7 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
{
    int nr_copies = s->inode.nr_copies;
    SheepdogObjReq hdr;
    unsigned int wlen;
    unsigned int wlen = 0;
    int ret;
    uint64_t oid = aio_req->oid;
    unsigned int datalen = aio_req->data_len;
@@ -964,18 +971,23 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,

    memset(&hdr, 0, sizeof(hdr));

    if (aiocb_type == AIOCB_READ_UDATA) {
        wlen = 0;
    switch (aiocb_type) {
    case AIOCB_FLUSH_CACHE:
        hdr.opcode = SD_OP_FLUSH_VDI;
        break;
    case AIOCB_READ_UDATA:
        hdr.opcode = SD_OP_READ_OBJ;
        hdr.flags = flags;
    } else if (create) {
        wlen = datalen;
        break;
    case AIOCB_WRITE_UDATA:
        if (create) {
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
        hdr.flags = SD_FLAG_CMD_WRITE | flags;
        } else {
        wlen = datalen;
            hdr.opcode = SD_OP_WRITE_OBJ;
        }
        wlen = datalen;
        hdr.flags = SD_FLAG_CMD_WRITE | flags;
        break;
    }

    if (s->cache_flags) {
@@ -1127,15 +1139,6 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
        s->cache_flags = SD_FLAG_CMD_DIRECT;
    }

    if (s->cache_flags == SD_FLAG_CMD_CACHE) {
        s->flush_fd = connect_to_sdog(s->addr, s->port);
        if (s->flush_fd < 0) {
            error_report("failed to connect");
            ret = s->flush_fd;
            goto out;
        }
    }

    if (snapid || tag[0] != '\0') {
        dprintf("%" PRIx32 " snapshot inode was open.\n", vid);
        s->is_snapshot = true;
@@ -1397,9 +1400,6 @@ static void sd_close(BlockDriverState *bs)

    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
    closesocket(s->fd);
    if (s->cache_flags) {
        closesocket(s->flush_fd);
    }
    g_free(s->addr);
}

@@ -1711,39 +1711,31 @@ static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
{
    BDRVSheepdogState *s = bs->opaque;
    SheepdogObjReq hdr = { 0 };
    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
    SheepdogInode *inode = &s->inode;
    SheepdogAIOCB *acb;
    AIOReq *aio_req;
    int ret;
    unsigned int wlen = 0, rlen = 0;

    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
        return 0;
    }

    hdr.opcode = SD_OP_FLUSH_VDI;
    hdr.oid = vid_to_vdi_oid(inode->vdi_id);
    acb = sd_aio_setup(bs, NULL, 0, 0, NULL, NULL);
    acb->aiocb_type = AIOCB_FLUSH_CACHE;
    acb->aio_done_func = sd_finish_aiocb;

    ret = do_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
    if (ret) {
        error_report("failed to send a request to the sheep");
    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
                            0, 0, 0, 0, 0);
    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
    ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
    if (ret < 0) {
        error_report("add_aio_request is failed");
        free_aio_req(s, aio_req);
        qemu_aio_release(acb);
        return ret;
    }

    if (rsp->result == SD_RES_INVALID_PARMS) {
        dprintf("disable write cache since the server doesn't support it\n");

        s->cache_flags = SD_FLAG_CMD_DIRECT;
        closesocket(s->flush_fd);
        return 0;
    }

    if (rsp->result != SD_RES_SUCCESS) {
        error_report("%s", sd_strerror(rsp->result));
        return -EIO;
    }

    return 0;
    qemu_coroutine_yield();
    return acb->ret;
}

static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)