/* pipetest.c * Scalability test of async poll functionality on pipes. * Copyright 2002, 2006 Red Hat, Inc. * Portions Copyright 2001 Davide Libenzi . * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define DEUBG 1 #if !defined(DEBUG) #define dprintf(x...) do {} while(0) #define d2printf(x...) do {} while(0) #else #define dprintf(x...) printf(x) #define d2printf(x...) printf(x) #endif enum { MODE_POLL, MODE_SYS_EPOLL, MODE_AIO_POLL, MODE_AIO_EPOLL, MODE_KEVENT_POLL, MODE_AIO_READ, } mode = MODE_POLL; const char *modes[] = { "poll", "sys-epoll", "aio-poll", "aio-epoll", "kevent-poll" "aio-read", }; int gnuplot = 0; #ifndef IOCB_CMD_EPOLL_WAIT #define IOCB_CMD_EPOLL_WAIT 9 static void io_prep_epoll_wait(struct iocb *iocb, int epfd, struct epoll_event *events, int maxevents, int timeout) { memset(iocb, 0, sizeof(*iocb)); iocb->aio_fildes = epfd; iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT; iocb->aio_reqprio = 0; iocb->u.c.nbytes = maxevents; iocb->u.c.offset = timeout; iocb->u.c.buf = events; } #endif /* IOCB_CMD_EPOLL_WAIT */ int MODE_AIO_WRITE; io_context_t ctx; int epoll_fd = -1; int done; struct iocb epoll_iocb; #define FD_SLOP 10 #define MAX_FDS 64*1024 #define READ 0 #define WRITE 1 struct { int fds[2]; } pipefds[MAX_FDS/2]; struct epoll_event epoll_events[MAX_FDS]; /* used for aio-epoll */ int nr_pipes; long nr_token_passes; long max_generation; int max_threads = 1; int threads_complete = 0; struct token { long generation; int tofd; int thread; }; int BUFSIZE = sizeof(struct token); struct fdinfo { struct iocb iocb; int zero; //struct iocb iocb2; int one; int fd; int pipe_idx; int active:1; int rw:1; char *buf; //char buf2[BUFSIZE]; } fdinfo[MAX_FDS]; #define queue_iocb(iocb) (pending_iocbs[nr_pending_iocbs++] = (iocb)) struct pollfd pollfds[MAX_FDS]; int nr_pollfds; struct iocb *pending_iocbs[MAX_FDS*3]; int nr_pending_iocbs; struct token pending_tokens[MAX_FDS]; int nr_pending_tokens; void pexit(char *msg) { perror(msg); exit(1); } void really_send_toke(struct token *toke) { struct fdinfo *inf; int *fds; int fd; int pipe_idx = nr_pipes - toke->thread - 1; fds = pipefds[pipe_idx].fds; fd = fds[WRITE]; inf = &fdinfo[fd]; dprintf("sending on pipe index %u, fd %d\n", pipe_idx, fd); if (MODE_AIO_WRITE) { memcpy(inf->buf, toke, BUFSIZE); io_prep_pwrite(&inf->iocb, fd, inf->buf, BUFSIZE, 0); queue_iocb(&inf->iocb); } else { int res = write(fds[WRITE], toke, BUFSIZE); d2printf("write[%ld %d %d]\n", toke->generation, toke->tofd, toke->thread); if (res != BUFSIZE) { printf("write = %d (%s)", res, strerror(errno)); exit(1); } } } void send_pending_tokes(void) { int i; for (i=0; igeneration * 17 + toke->thread; pipe_idx += toke->thread * (nr_pipes / max_threads); pipe_idx %= nr_pipes; #endif pipe_idx = nr_pipes - toke->thread - 1; fds = pipefds[pipe_idx].fds; toke->tofd = fds[READ]; #if 1 pending_tokens[nr_pending_tokens++] = *toke; #else really_send_toke(toke); #endif } void process_token(int fd, struct token *toke, int nr) { if (nr != BUFSIZE) fprintf(stderr, "process_token: nr == %d (vs %d)\n", nr, BUFSIZE); assert(nr == BUFSIZE); assert(toke->tofd == fd); nr_token_passes++; toke->generation++; dprintf("passed %ld\n", nr_token_passes); if (toke->generation < max_generation) send_toke(toke); else { dprintf("thread %d complete, %ld passes\n", toke->thread, nr_token_passes); threads_complete++; if (threads_complete >= max_threads) done = 1; } } void read_and_process_token(int fd) { //char buf[BUFSIZE]; struct token *toke; struct fdinfo *inf = &fdinfo[fd]; char *buf = inf->buf; int nr; nr = read(fd, buf, BUFSIZE); if (-1 == nr) pexit("read"); /* kludge: works around epoll edge notification bug */ toke = (struct token *)buf; while (nr >= BUFSIZE) { process_token(fd, toke, BUFSIZE); toke++; nr -= BUFSIZE; } assert(nr == 0); } void aio_poll_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct fdinfo *inf = (struct fdinfo *)iocb; read_and_process_token(inf->fd); io_prep_poll(&inf->iocb, inf->fd, POLLIN); io_set_callback(&inf->iocb, aio_poll_callback); queue_iocb(&inf->iocb); } void aio_epoll_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { int i; struct epoll_event *epevents = (struct epoll_event *)iocb->u.c.buf; struct epoll_event *epevent = epevents; assert(epevents != NULL); for (i = 0; i < res; i++, epevent++) { if (epevent->events & EPOLLIN) read_and_process_token(epevent->data.fd); } /* queue another epoll wait */ io_prep_epoll_wait(&epoll_iocb, epoll_fd, epevents, MAX_FDS, -1); io_set_callback(&epoll_iocb, aio_epoll_callback); queue_iocb(&epoll_iocb); } void aio_read_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct fdinfo *inf = (struct fdinfo *)iocb; struct token *toke = (void *)inf->buf; d2printf("read callback: iocb=%p\n", iocb); #if 0 if (inf->zero == 1) { inf = (struct fdinfo *)( (char *)iocb - (long)(&((struct fdinfo *)0)->iocb2) ); toke = (struct token *)inf->buf2; } #endif assert(inf->zero == 0); assert(inf->one == 1); d2printf("aio_read_callback: %p %ld, fd=%d\n", iocb, res, inf->fd); d2printf("[%ld %d %d]\n", toke->generation, toke->tofd, toke->thread); process_token(inf->fd, toke, res); io_prep_pread(iocb, inf->fd, toke, BUFSIZE, 0); io_set_callback(iocb, aio_read_callback); queue_iocb(iocb); } void makeapipe(int idx) { int *fds = pipefds[idx].fds; struct fdinfo *inf; int i; if (pipe(fds)) pexit("pipe"); for (i=0; i<2; i++) { int fl = fcntl(fds[i], F_GETFL); fl |= O_NONBLOCK; fcntl(fds[i], F_SETFL, fl); } inf = &fdinfo[fds[READ]]; inf->buf = calloc(1, BUFSIZE); assert(inf->buf != NULL); assert(inf->active == 0); inf->active = 1; inf->rw = READ; inf->fd = fds[READ]; inf->pipe_idx = idx; if (mode == MODE_AIO_POLL) { io_prep_poll(&inf->iocb, fds[READ], POLLIN); io_set_callback(&inf->iocb, aio_poll_callback); queue_iocb(&inf->iocb); } if (mode == MODE_AIO_READ) { inf->zero = 0; inf->one = 1; io_prep_pread(&inf->iocb, fds[READ], inf->buf, BUFSIZE, 0); io_set_callback(&inf->iocb, aio_read_callback); queue_iocb(&inf->iocb); d2printf("aio_read(%p %p, %d)\n", inf, &inf->iocb, fds[READ]); #if 0 io_prep_pread(&inf->iocb2, fds[READ], inf->buf2, BUFSIZE, 0); io_set_callback(&inf->iocb2, aio_read_callback); queue_iocb(&inf->iocb2); d2printf("aio_read(%p %p, %d)\n", inf, &inf->iocb2, fds[READ]); #endif } if (mode == MODE_POLL) { pollfds[nr_pollfds].fd = fds[READ]; pollfds[nr_pollfds].events = POLLIN; pollfds[nr_pollfds].revents = 0; nr_pollfds++; } if (mode == MODE_SYS_EPOLL || mode == MODE_AIO_EPOLL) { struct epoll_event event; memset(&event, 0, sizeof(event)); event.data.fd = fds[READ]; event.events = EPOLLIN; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fds[READ], &event) < 0) pexit("epoll_ctl"); } inf = &fdinfo[fds[WRITE]]; assert(inf->active == 0); inf->active = 1; inf->rw = WRITE; inf->fd = fds[WRITE]; inf->pipe_idx = idx; } void makepipes(int nr) { int i; for (i=0; ievents & EPOLLIN) read_and_process_token(event->data.fd); } } } void aio_main_loop(void) { struct io_event *events = malloc(sizeof(struct io_event) * MAX_FDS); struct io_event *event; assert(events != NULL); while (!done) { int i, res; if (!MODE_AIO_WRITE && mode != MODE_AIO_EPOLL) submit_iocbs(); send_pending_tokes(); if (MODE_AIO_WRITE || MODE_AIO_EPOLL) submit_iocbs(); send_pending_tokes(); res = io_getevents(ctx, 1, MAX_FDS, events, NULL); if (res <= 0) { printf("io_getevents: %d (%s)", res, strerror(-res)); exit(1); } for (i = 0, event = events; i < res; i++, event++) { io_callback_t cb; struct iocb *iocb; cb = (io_callback_t)event->data; iocb = (struct iocb *)event->obj; cb(ctx, iocb, event->res, event->res2); } } } void seedthreads(int nr) { struct token toke; int i; for (i=0; i 1) { if (0 == strcmp(argv[1], "--aio-poll")) { mode = MODE_AIO_POLL; } else if (0 == strcmp(argv[1], "--poll")) { mode = MODE_POLL; } else if (0 == strcmp(argv[1], "--sys-epoll")) { mode = MODE_SYS_EPOLL; } else if (0 == strcmp(argv[1], "--aio-read")) { mode = MODE_AIO_READ; } else if (0 == strcmp(argv[1], "--aio-write")) { MODE_AIO_WRITE = 1; } else if (0 == strcmp(argv[1], "--aio-epoll")) { mode = MODE_AIO_EPOLL; } else if (0 == strcmp(argv[1], "--kevent-poll")) { mode = MODE_KEVENT_POLL; printf("kevent poll support not yet implemented\n"); exit(1); } else if (0 == strcmp(argv[1], "--bufsize")) { argv++,argc--; BUFSIZE = atoi(argv[1]); assert(BUFSIZE > (int)sizeof(struct token)); } else if (0 == strcmp(argv[1], "--gnuplot")) { gnuplot = 1; } else break; argv++,argc--; } if (argc != 4) { fprintf(stderr, "usage: pipetest [--poll | --sys-epoll | --aio-poll | --aio-epoll | --aio-read]\n" "\t[--bufsize] \n"); return 2; } nr = atoi(argv[1]); max_threads = atoi(argv[2]); max_generation = atol(argv[3]); if (gnuplot) printf("%d %d %d ", nr, max_threads, BUFSIZE); else printf("using %d pipe pairs, %d message threads, %ld generations, %d bufsize\n", nr, max_threads, max_generation, BUFSIZE); if (nr < 2) { printf("uhm, please specify at least 2 pipe pairs\n"); exit(1); } if (nr >= (MAX_FDS/2 - FD_SLOP)) { printf("%d exceeds limit of %d pipe pairs.\n", nr, (MAX_FDS/2-FD_SLOP)); exit(1); } if (mode == MODE_AIO_READ || mode == MODE_AIO_POLL || mode == MODE_AIO_EPOLL || MODE_AIO_WRITE) { res = io_queue_init(MAX_FDS, &ctx); if (res < 0) { printf("io_queue_init: %d (%s)\n", res, strerror(-res)); exit(1); } } if (mode == MODE_SYS_EPOLL || mode == MODE_AIO_EPOLL) sys_epoll_setup(); makepipes(nr); if (mode == MODE_AIO_EPOLL) { /* queue the initial epoll_wait */ bzero(&epoll_iocb, sizeof(epoll_iocb)); bzero(epoll_events, MAX_FDS * sizeof(struct epoll_event)); io_prep_epoll_wait(&epoll_iocb, epoll_fd, epoll_events, MAX_FDS, -1); io_set_callback(&epoll_iocb, aio_epoll_callback); queue_iocb(&epoll_iocb); } /* epoll and poll both have their startup overhead in * makepipes(). By submitting the initial read/poll * requests for aio now, we avoid measuring the static * overhead of aio as the number of file descriptors * increases. */ submit_iocbs(); send_pending_tokes(); submit_iocbs(); gettimeofday(&stv, NULL); seedthreads(max_threads); if (mode == MODE_POLL) poll_main_loop(); if (mode == MODE_SYS_EPOLL) sys_epoll_main_loop(); if (mode == MODE_AIO_POLL || mode == MODE_AIO_READ || mode == MODE_AIO_EPOLL) aio_main_loop(); gettimeofday(&etv, NULL); etv.tv_sec -= stv.tv_sec; etv.tv_usec -= stv.tv_usec; if (etv.tv_usec < 0) { etv.tv_usec += 1000000; etv.tv_sec -= 1; } if (!gnuplot) printf("Ok! Mode %s: %ld passes in %ld.%06ld seconds\n", modes[mode], nr_token_passes, etv.tv_sec, etv.tv_usec); usecs = etv.tv_usec + etv.tv_sec * 1000000LL; passes_per_sec = nr_token_passes * 1000000LL * 100; passes_per_sec /= usecs; if (gnuplot) printf("%Ld.%02Ld\n", passes_per_sec / 100, passes_per_sec % 100); else printf("passes_per_sec: %Ld.%02Ld\n", passes_per_sec / 100, passes_per_sec % 100); return 0; }