Commit 25e5e4c7 authored by Michael Tokarev's avatar Michael Tokarev
Browse files

rewrite iov_send_recv() and move it to iov.c



Make it much more understandable, add a missing
iov_cnt argument (number of iovs in the iov), and
add comments to it.

The new implementation has been extensively tested
by splitting a large buffer into many small
randomly-sized chunks, sending it over socket to
another, slow process and verifying the receiving
data is the same.

Also add a unit test for iov_send_recv(), sending/
receiving data between two processes over a socketpair
using random vectors and random sizes.

Signed-off-by: default avatarMichael Tokarev <mjt@tls.msk.ru>
parent 2fc8ae1d
Loading
Loading
Loading
Loading
+0 −83
Original line number Diff line number Diff line
@@ -375,86 +375,3 @@ int qemu_parse_fd(const char *param)
    }
    return fd;
}

ssize_t iov_send_recv(int sockfd, struct iovec *iov,
                      size_t offset, size_t bytes,
                      bool do_sendv)
{
    int iovlen;
    ssize_t ret;
    size_t diff;
    struct iovec *last_iov;

    /* last_iov is inclusive, so count from one.  */
    iovlen = 1;
    last_iov = iov;
    bytes += offset;

    while (last_iov->iov_len < bytes) {
        bytes -= last_iov->iov_len;

        last_iov++;
        iovlen++;
    }

    diff = last_iov->iov_len - bytes;
    last_iov->iov_len -= diff;

    while (iov->iov_len <= offset) {
        offset -= iov->iov_len;

        iov++;
        iovlen--;
    }

    iov->iov_base = (char *) iov->iov_base + offset;
    iov->iov_len -= offset;

    {
#if defined CONFIG_IOVEC && defined CONFIG_POSIX
        struct msghdr msg;
        memset(&msg, 0, sizeof(msg));
        msg.msg_iov = iov;
        msg.msg_iovlen = iovlen;

        do {
            if (do_sendv) {
                ret = sendmsg(sockfd, &msg, 0);
            } else {
                ret = recvmsg(sockfd, &msg, 0);
            }
        } while (ret == -1 && errno == EINTR);
#else
        struct iovec *p = iov;
        ret = 0;
        while (iovlen > 0) {
            int rc;
            if (do_sendv) {
                rc = send(sockfd, p->iov_base, p->iov_len, 0);
            } else {
                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
            }
            if (rc == -1) {
                if (errno == EINTR) {
                    continue;
                }
                if (ret == 0) {
                    ret = -1;
                }
                break;
            }
            if (rc == 0) {
                break;
            }
            ret += rc;
            iovlen--, p++;
        }
#endif
    }

    /* Undo the changes above */
    iov->iov_base = (char *) iov->iov_base - offset;
    iov->iov_len += offset;
    last_iov->iov_len += diff;
    return ret;
}
+103 −0
Original line number Diff line number Diff line
@@ -18,6 +18,14 @@

#include "iov.h"

#ifdef _WIN32
# include <windows.h>
# include <winsock2.h>
#else
# include <sys/types.h>
# include <sys/socket.h>
#endif

size_t iov_from_buf(struct iovec *iov, unsigned int iov_cnt,
                    size_t offset, const void *buf, size_t bytes)
{
@@ -87,6 +95,101 @@ size_t iov_size(const struct iovec *iov, const unsigned int iov_cnt)
    return len;
}

/* helper function for iov_send_recv() */
static ssize_t
do_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, bool do_send)
{
#if defined CONFIG_IOVEC && defined CONFIG_POSIX
    ssize_t ret;
    struct msghdr msg;
    memset(&msg, 0, sizeof(msg));
    msg.msg_iov = iov;
    msg.msg_iovlen = iov_cnt;
    do {
        ret = do_send
            ? sendmsg(sockfd, &msg, 0)
            : recvmsg(sockfd, &msg, 0);
    } while (ret < 0 && errno == EINTR);
    return ret;
#else
    /* else send piece-by-piece */
    /*XXX Note: windows has WSASend() and WSARecv() */
    unsigned i;
    size_t count = 0;
    for (i = 0; i < iov_cnt; ++i) {
        ssize_t r = do_send
            ? send(sockfd, iov[i].iov_base, iov[i].iov_len, 0)
            : recv(sockfd, iov[i].iov_base, iov[i].iov_len, 0);
        if (r > 0) {
            ret += r;
        } else if (!r) {
            break;
        } else if (errno == EINTR) {
            continue;
        } else {
            /* else it is some "other" error,
             * only return if there was no data processed. */
            if (ret == 0) {
                return -1;
            }
            break;
        }
    }
    return count;
#endif
}

ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt,
                      size_t offset, size_t bytes,
                      bool do_send)
{
    ssize_t ret;
    unsigned si, ei;            /* start and end indexes */

    /* Find the start position, skipping `offset' bytes:
     * first, skip all full-sized vector elements, */
    for (si = 0; si < iov_cnt && offset >= iov[si].iov_len; ++si) {
        offset -= iov[si].iov_len;
    }
    if (offset) {
        assert(si < iov_cnt);
        /* second, skip `offset' bytes from the (now) first element,
         * undo it on exit */
        iov[si].iov_base += offset;
        iov[si].iov_len -= offset;
    }
    /* Find the end position skipping `bytes' bytes: */
    /* first, skip all full-sized elements */
    for (ei = si; ei < iov_cnt && iov[ei].iov_len <= bytes; ++ei) {
        bytes -= iov[ei].iov_len;
    }
    if (bytes) {
        /* second, fixup the last element, and remember
         * the length we've cut from the end of it in `bytes' */
        size_t tail;
        assert(ei < iov_cnt);
        assert(iov[ei].iov_len > bytes);
        tail = iov[ei].iov_len - bytes;
        iov[ei].iov_len = bytes;
        bytes = tail;  /* bytes is now equal to the tail size */
        ++ei;
    }

    ret = do_send_recv(sockfd, iov + si, ei - si, do_send);

    /* Undo the changes above */
    if (offset) {
        iov[si].iov_base -= offset;
        iov[si].iov_len += offset;
    }
    if (bytes) {
        iov[ei-1].iov_len += bytes;
    }

    return ret;
}


void iov_hexdump(const struct iovec *iov, const unsigned int iov_cnt,
                 FILE *fp, const char *prefix, size_t limit)
{
+9 −6
Original line number Diff line number Diff line
@@ -60,7 +60,7 @@ size_t iov_memset(const struct iovec *iov, const unsigned int iov_cnt,
 * `offset' bytes in the beginning of iovec buffer are skipped and
 * next `bytes' bytes are used, which must be within data of iovec.
 *
 *   r = iov_send_recv(sockfd, iov, offset, bytes, true);
 *   r = iov_send_recv(sockfd, iov, iovcnt, offset, bytes, true);
 *
 * is logically equivalent to
 *
@@ -68,13 +68,16 @@ size_t iov_memset(const struct iovec *iov, const unsigned int iov_cnt,
 *   iov_to_buf(iov, iovcnt, offset, buf, bytes);
 *   r = send(sockfd, buf, bytes, 0);
 *   free(buf);
 *
 * For iov_send_recv() _whole_ area being sent or received
 * should be within the iovec, not only beginning of it.
 */
ssize_t iov_send_recv(int sockfd, struct iovec *iov,
ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt,
                      size_t offset, size_t bytes, bool do_send);
#define iov_recv(sockfd, iov, offset, bytes) \
  iov_send_recv(sockfd, iov, offset, bytes, false)
#define iov_send(sockfd, iov, offset, bytes) \
  iov_send_recv(sockfd, iov, offset, bytes, true)
#define iov_recv(sockfd, iov, iov_cnt, offset, bytes) \
  iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false)
#define iov_send(sockfd, iov, iov_cnt, offset, bytes) \
  iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true)

/**
 * Produce a text hexdump of iovec `iov' with `iov_cnt' number of elements
+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt,
    size_t done = 0;
    ssize_t ret;
    while (done < bytes) {
        ret = iov_send_recv(sockfd, iov,
        ret = iov_send_recv(sockfd, iov, iov_cnt,
                            offset + done, bytes - done, do_send);
        if (ret > 0) {
            done += ret;
+107 −0
Original line number Diff line number Diff line
#include <glib.h>
#include "qemu-common.h"
#include "iov.h"
#include "qemu_socket.h"

/* create a randomly-sized iovec with random vectors */
static void iov_random(struct iovec **iovp, unsigned *iov_cntp)
@@ -144,10 +145,116 @@ static void test_to_from_buf(void)
    }
}

static void test_io(void)
{
#ifndef _WIN32
/* socketpair(PF_UNIX) which does not exist on windows */

    int sv[2];
    int r;
    unsigned i, j, k, s, t;
    fd_set fds;
    unsigned niov;
    struct iovec *iov, *siov;
    unsigned char *buf;
    size_t sz;

    iov_random(&iov, &niov);
    sz = iov_size(iov, niov);
    buf = g_malloc(sz);
    for (i = 0; i < sz; ++i) {
        buf[i] = i & 255;
    }
    iov_from_buf(iov, niov, 0, buf, sz);

    siov = g_malloc(sizeof(*iov) * niov);
    memcpy(siov, iov, sizeof(*iov) * niov);

    if (socketpair(PF_UNIX, SOCK_STREAM, 0, sv) < 0) {
       perror("socketpair");
       exit(1);
    }

    FD_ZERO(&fds);

    t = 0;
    if (fork() == 0) {
       /* writer */

       close(sv[0]);
       FD_SET(sv[1], &fds);
       fcntl(sv[1], F_SETFL, O_RDWR|O_NONBLOCK);
       r = g_test_rand_int_range(sz / 2, sz);
       setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &r, sizeof(r));

       for (i = 0; i <= sz; ++i) {
           for (j = i; j <= sz; ++j) {
               k = i;
               do {
                   s = g_test_rand_int_range(0, j - k + 1);
                   r = iov_send(sv[1], iov, niov, k, s);
                   g_assert(memcmp(iov, siov, sizeof(*iov)*niov) == 0);
                   if (r >= 0) {
                       k += r;
                       t += r;
                       usleep(g_test_rand_int_range(0, 30));
                   } else if (errno == EAGAIN) {
                       select(sv[1]+1, NULL, &fds, NULL, NULL);
                       continue;
                   } else {
                       perror("send");
                       exit(1);
                   }
               } while(k < j);
           }
       }
       exit(0);

    } else {
       /* reader & verifier */

       close(sv[1]);
       FD_SET(sv[0], &fds);
       fcntl(sv[0], F_SETFL, O_RDWR|O_NONBLOCK);
       r = g_test_rand_int_range(sz / 2, sz);
       setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &r, sizeof(r));
       usleep(500000);

       for (i = 0; i <= sz; ++i) {
           for (j = i; j <= sz; ++j) {
               k = i;
               iov_memset(iov, niov, 0, 0xff, -1);
               do {
                   s = g_test_rand_int_range(0, j - k + 1);
                   r = iov_recv(sv[0], iov, niov, k, s);
                   g_assert(memcmp(iov, siov, sizeof(*iov)*niov) == 0);
                   if (r > 0) {
                       k += r;
                       t += r;
                   } else if (!r) {
                       if (s) {
                           break;
                       }
                   } else if (errno == EAGAIN) {
                       select(sv[0]+1, &fds, NULL, NULL, NULL);
                       continue;
                   } else {
                       perror("recv");
                       exit(1);
                   }
               } while(k < j);
               test_iov_bytes(iov, niov, i, j - i);
           }
        }
     }
#endif
}

int main(int argc, char **argv)
{
    g_test_init(&argc, &argv, NULL);
    g_test_rand_int();
    g_test_add_func("/basic/iov/from-to-buf", test_to_from_buf);
    g_test_add_func("/basic/iov/io", test_io);
    return g_test_run();
}