1135042Srwatson/*- 2135042Srwatson * Copyright (c) 2004 Robert N. M. Watson 3135042Srwatson * All rights reserved. 4135042Srwatson * 5135042Srwatson * Redistribution and use in source and binary forms, with or without 6135042Srwatson * modification, are permitted provided that the following conditions 7135042Srwatson * are met: 8135042Srwatson * 1. Redistributions of source code must retain the above copyright 9135042Srwatson * notice, this list of conditions and the following disclaimer. 10135042Srwatson * 2. Redistributions in binary form must reproduce the above copyright 11135042Srwatson * notice, this list of conditions and the following disclaimer in the 12135042Srwatson * documentation and/or other materials provided with the distribution. 13135042Srwatson * 14135042Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15135042Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16135042Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17135042Srwatson * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18135042Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19135042Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20135042Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21135042Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22135042Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23135042Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24135042Srwatson * SUCH DAMAGE. 25135042Srwatson * 26135042Srwatson * $FreeBSD$ 27135042Srwatson */ 28135042Srwatson 29135042Srwatson#include <sys/types.h> 30135042Srwatson#include <sys/socket.h> 31135042Srwatson#include <sys/time.h> 32227345Scognet#include <sys/poll.h> 33135042Srwatson 34135042Srwatson#include <netinet/in.h> 35227345Scognet#include <netdb.h> /* getaddrinfo */ 36135042Srwatson 37135042Srwatson#include <arpa/inet.h> 38135042Srwatson 39135042Srwatson#include <stdio.h> 40135042Srwatson#include <stdlib.h> 41135042Srwatson#include <string.h> 42227345Scognet#include <unistd.h> /* close */ 43135042Srwatson 44227345Scognet#define MAXSOCK 20 45227345Scognet 46244644Sluigi#include <pthread.h> 47244644Sluigi#include <fcntl.h> 48244644Sluigi#include <time.h> /* clock_getres() */ 49244644Sluigi 50244644Sluigistatic int round_to(int n, int l) 51244644Sluigi{ 52244644Sluigi return ((n + l - 1)/l)*l; 53244644Sluigi} 54244644Sluigi 55244644Sluigi/* 56244644Sluigi * Each socket uses multiple threads so the receiver is 57244644Sluigi * more efficient. A collector thread runs the stats. 58244644Sluigi */ 59244644Sluigistruct td_desc { 60244644Sluigi pthread_t td_id; 61244644Sluigi uint64_t count; /* rx counter */ 62244672Sluigi uint64_t byte_count; /* rx byte counter */ 63244644Sluigi int fd; 64244644Sluigi char *buf; 65244644Sluigi int buflen; 66244644Sluigi}; 67244644Sluigi 68135042Srwatsonstatic void 69135042Srwatsonusage(void) 70135042Srwatson{ 71135042Srwatson 72244644Sluigi fprintf(stderr, "netreceive port [nthreads]\n"); 73135042Srwatson exit(-1); 74135042Srwatson} 75135042Srwatson 76244644Sluigistatic __inline void 77244644Sluigitimespec_add(struct timespec *tsa, struct timespec *tsb) 78244644Sluigi{ 79244644Sluigi 80244644Sluigi tsa->tv_sec += tsb->tv_sec; 81244644Sluigi tsa->tv_nsec += tsb->tv_nsec; 82244644Sluigi if (tsa->tv_nsec >= 1000000000) { 83244644Sluigi tsa->tv_sec++; 84244644Sluigi tsa->tv_nsec -= 1000000000; 85244644Sluigi } 86244644Sluigi} 87244644Sluigi 88244644Sluigistatic __inline void 89244644Sluigitimespec_sub(struct timespec *tsa, struct timespec *tsb) 90244644Sluigi{ 91244644Sluigi 92244644Sluigi tsa->tv_sec -= tsb->tv_sec; 93244644Sluigi tsa->tv_nsec -= tsb->tv_nsec; 94244644Sluigi if (tsa->tv_nsec < 0) { 95244644Sluigi tsa->tv_sec--; 96244644Sluigi tsa->tv_nsec += 1000000000; 97244644Sluigi } 98244644Sluigi} 99244644Sluigi 100244644Sluigistatic void * 101244644Sluigirx_body(void *data) 102244644Sluigi{ 103244644Sluigi struct td_desc *t = data; 104244644Sluigi struct pollfd fds; 105244644Sluigi int y; 106244644Sluigi 107244644Sluigi fds.fd = t->fd; 108244644Sluigi fds.events = POLLIN; 109244644Sluigi 110244644Sluigi for (;;) { 111244644Sluigi if (poll(&fds, 1, -1) < 0) 112244644Sluigi perror("poll on thread"); 113244644Sluigi if (!(fds.revents & POLLIN)) 114244644Sluigi continue; 115244644Sluigi for (;;) { 116244644Sluigi y = recv(t->fd, t->buf, t->buflen, MSG_DONTWAIT); 117244644Sluigi if (y < 0) 118244644Sluigi break; 119244644Sluigi t->count++; 120244672Sluigi t->byte_count += y; 121244644Sluigi } 122244644Sluigi } 123244644Sluigi return NULL; 124244644Sluigi} 125244644Sluigi 126244672Sluigistatic struct td_desc ** 127244672Sluigimake_threads(int *s, int nsock, int nthreads) 128244644Sluigi{ 129244644Sluigi int i, si, nt = nsock * nthreads; 130244644Sluigi int lb = round_to(nt * sizeof (struct td_desc *), 64); 131244644Sluigi int td_len = round_to(sizeof(struct td_desc), 64); // cache align 132244644Sluigi char *m = calloc(1, lb + td_len * nt); 133244672Sluigi struct td_desc **tp; 134244644Sluigi 135244644Sluigi printf("td len %d -> %d\n", (int)sizeof(struct td_desc) , td_len); 136244644Sluigi /* pointers plus the structs */ 137244644Sluigi if (m == NULL) { 138244644Sluigi perror("no room for pointers!"); 139244644Sluigi exit(1); 140244644Sluigi } 141244644Sluigi tp = (struct td_desc **)m; 142244644Sluigi m += lb; /* skip the pointers */ 143244644Sluigi for (si = i = 0; i < nt; i++, m += td_len) { 144244644Sluigi tp[i] = (struct td_desc *)m; 145244644Sluigi tp[i]->fd = s[si]; 146244672Sluigi tp[i]->buflen = 65536; 147244672Sluigi tp[i]->buf = calloc(1, tp[i]->buflen); 148244644Sluigi if (++si == nsock) 149244644Sluigi si = 0; 150244644Sluigi if (pthread_create(&tp[i]->td_id, NULL, rx_body, tp[i])) { 151244644Sluigi perror("unable to create thread"); 152244644Sluigi exit(1); 153244644Sluigi } 154244644Sluigi } 155244672Sluigi return tp; 156244644Sluigi} 157244644Sluigi 158244672Sluigistatic void 159244644Sluigimain_thread(struct td_desc **tp, int nsock, int nthreads) 160244644Sluigi{ 161244672Sluigi uint64_t c0, c1, bc0, bc1; 162244644Sluigi struct timespec now, then, delta; 163244644Sluigi /* now the parent collects and prints results */ 164244672Sluigi c0 = c1 = bc0 = bc1 = 0; 165244644Sluigi clock_gettime(CLOCK_REALTIME, &then); 166244644Sluigi fprintf(stderr, "start at %ld.%09ld\n", then.tv_sec, then.tv_nsec); 167244644Sluigi while (1) { 168244644Sluigi int i, nt = nsock * nthreads; 169244644Sluigi int64_t dn; 170244672Sluigi uint64_t pps, bps; 171244644Sluigi 172244644Sluigi if (poll(NULL, 0, 500) < 0) 173244644Sluigi perror("poll"); 174244672Sluigi c0 = bc0 = 0; 175244644Sluigi for (i = 0; i < nt; i++) { 176244644Sluigi c0 += tp[i]->count; 177244672Sluigi bc0 += tp[i]->byte_count; 178244644Sluigi } 179244644Sluigi dn = c0 - c1; 180244644Sluigi clock_gettime(CLOCK_REALTIME, &now); 181244644Sluigi delta = now; 182244644Sluigi timespec_sub(&delta, &then); 183244644Sluigi then = now; 184244644Sluigi pps = dn; 185244644Sluigi pps = (pps * 1000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); 186244672Sluigi bps = ((bc0 - bc1) * 8000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); 187244672Sluigi fprintf(stderr, " %9ld pps %8.3f Mbps", (long)pps, .000001*bps); 188244672Sluigi fprintf(stderr, " - %d pkts in %ld.%09ld ns\n", 189244672Sluigi (int)dn, delta.tv_sec, delta.tv_nsec); 190244644Sluigi c1 = c0; 191244672Sluigi bc1 = bc0; 192244644Sluigi } 193244644Sluigi} 194244644Sluigi 195244644Sluigiint 196135042Srwatsonmain(int argc, char *argv[]) 197135042Srwatson{ 198227345Scognet struct addrinfo hints, *res, *res0; 199135042Srwatson char *dummy, *packet; 200227345Scognet int port; 201244644Sluigi int error, v, nthreads = 1; 202244644Sluigi struct td_desc **tp; 203227345Scognet const char *cause = NULL; 204227345Scognet int s[MAXSOCK]; 205227345Scognet int nsock; 206135042Srwatson 207244644Sluigi if (argc < 2) 208135042Srwatson usage(); 209135042Srwatson 210227345Scognet memset(&hints, 0, sizeof(hints)); 211227345Scognet hints.ai_family = PF_UNSPEC; 212227345Scognet hints.ai_socktype = SOCK_DGRAM; 213227345Scognet hints.ai_flags = AI_PASSIVE; 214135042Srwatson 215135042Srwatson port = strtoul(argv[1], &dummy, 10); 216135042Srwatson if (port < 1 || port > 65535 || *dummy != '\0') 217135042Srwatson usage(); 218244644Sluigi if (argc > 2) 219244644Sluigi nthreads = strtoul(argv[2], &dummy, 10); 220244644Sluigi if (nthreads < 1 || nthreads > 64) 221244644Sluigi usage(); 222135042Srwatson 223135042Srwatson packet = malloc(65536); 224135042Srwatson if (packet == NULL) { 225135042Srwatson perror("malloc"); 226135042Srwatson return (-1); 227135042Srwatson } 228135042Srwatson bzero(packet, 65536); 229135042Srwatson 230227345Scognet error = getaddrinfo(NULL, argv[1], &hints, &res0); 231227345Scognet if (error) { 232227345Scognet perror(gai_strerror(error)); 233135042Srwatson return (-1); 234227345Scognet /*NOTREACHED*/ 235135042Srwatson } 236135042Srwatson 237227345Scognet nsock = 0; 238227345Scognet for (res = res0; res && nsock < MAXSOCK; res = res->ai_next) { 239227345Scognet s[nsock] = socket(res->ai_family, res->ai_socktype, 240227345Scognet res->ai_protocol); 241227345Scognet if (s[nsock] < 0) { 242227345Scognet cause = "socket"; 243227345Scognet continue; 244227345Scognet } 245227345Scognet 246227345Scognet v = 128 * 1024; 247227345Scognet if (setsockopt(s[nsock], SOL_SOCKET, SO_RCVBUF, &v, sizeof(v)) < 0) { 248227345Scognet cause = "SO_RCVBUF"; 249227345Scognet close(s[nsock]); 250227345Scognet continue; 251227345Scognet } 252227345Scognet if (bind(s[nsock], res->ai_addr, res->ai_addrlen) < 0) { 253227345Scognet cause = "bind"; 254227345Scognet close(s[nsock]); 255227345Scognet continue; 256227345Scognet } 257227345Scognet (void) listen(s[nsock], 5); 258227345Scognet nsock++; 259135536Srwatson } 260227345Scognet if (nsock == 0) { 261227345Scognet perror(cause); 262135042Srwatson return (-1); 263227345Scognet /*NOTREACHED*/ 264135042Srwatson } 265135042Srwatson 266244644Sluigi printf("netreceive %d sockets x %d threads listening on UDP port %d\n", 267244644Sluigi nsock, nthreads, (u_short)port); 268135042Srwatson 269244672Sluigi tp = make_threads(s, nsock, nthreads); 270244644Sluigi main_thread(tp, nsock, nthreads); 271244644Sluigi 272227345Scognet /*NOTREACHED*/ 273227345Scognet freeaddrinfo(res0); 274135042Srwatson} 275