1/*
2 * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3 * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses.  You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * OpenIB.org BSD license below:
10 *
11 *     Redistribution and use in source and binary forms, with or
12 *     without modification, are permitted provided that the following
13 *     conditions are met:
14 *
15 *      - Redistributions of source code must retain the above
16 *        copyright notice, this list of conditions and the following
17 *        disclaimer.
18 *
19 *      - Redistributions in binary form must reproduce the above
20 *        copyright notice, this list of conditions and the following
21 *        disclaimer in the documentation and/or other materials
22 *        provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34#include <getopt.h>
35#include <stdlib.h>
36#include <string.h>
37#include <stdio.h>
38#include <errno.h>
39#include <sys/types.h>
40#include <netinet/in.h>
41#include <sys/socket.h>
42#include <netdb.h>
43#include <byteswap.h>
44#include <semaphore.h>
45#include <arpa/inet.h>
46#include <pthread.h>
47#include <inttypes.h>
48
49#include <rdma/rdma_cma.h>
50#include <infiniband/arch.h>
51
52static int debug = 0;
53#define DEBUG_LOG if (debug) printf
54
55/*
56 * rping "ping/pong" loop:
57 * 	client sends source rkey/addr/len
58 *	server receives source rkey/add/len
59 *	server rdma reads "ping" data from source
60 * 	server sends "go ahead" on rdma read completion
61 *	client sends sink rkey/addr/len
62 * 	server receives sink rkey/addr/len
63 * 	server rdma writes "pong" data to sink
64 * 	server sends "go ahead" on rdma write completion
65 * 	<repeat loop>
66 */
67
68/*
69 * These states are used to signal events between the completion handler
70 * and the main client or server thread.
71 *
72 * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
73 * and RDMA_WRITE_COMPLETE for each ping.
74 */
75enum test_state {
76	IDLE = 1,
77	CONNECT_REQUEST,
78	ADDR_RESOLVED,
79	ROUTE_RESOLVED,
80	CONNECTED,
81	RDMA_READ_ADV,
82	RDMA_READ_COMPLETE,
83	RDMA_WRITE_ADV,
84	RDMA_WRITE_COMPLETE,
85	ERROR
86};
87
88struct rping_rdma_info {
89	uint64_t buf;
90	uint32_t rkey;
91	uint32_t size;
92};
93
94/*
95 * Default max buffer size for IO...
96 */
97#define RPING_BUFSIZE 64*1024
98#define RPING_SQ_DEPTH 16
99
100/* Default string for print data and
101 * minimum buffer size
102 */
103#define _stringify( _x ) # _x
104#define stringify( _x ) _stringify( _x )
105
106#define RPING_MSG_FMT           "rdma-ping-%d: "
107#define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
108
109/*
110 * Control block struct.
111 */
112struct rping_cb {
113	int server;			/* 0 iff client */
114	pthread_t cqthread;
115	pthread_t persistent_server_thread;
116	struct ibv_comp_channel *channel;
117	struct ibv_cq *cq;
118	struct ibv_pd *pd;
119	struct ibv_qp *qp;
120
121	struct ibv_recv_wr rq_wr;	/* recv work request record */
122	struct ibv_sge recv_sgl;	/* recv single SGE */
123	struct rping_rdma_info recv_buf;/* malloc'd buffer */
124	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
125
126	struct ibv_send_wr sq_wr;	/* send work request record */
127	struct ibv_sge send_sgl;
128	struct rping_rdma_info send_buf;/* single send buf */
129	struct ibv_mr *send_mr;
130
131	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
132	struct ibv_sge rdma_sgl;	/* rdma single SGE */
133	char *rdma_buf;			/* used as rdma sink */
134	struct ibv_mr *rdma_mr;
135
136	uint32_t remote_rkey;		/* remote guys RKEY */
137	uint64_t remote_addr;		/* remote guys TO */
138	uint32_t remote_len;		/* remote guys LEN */
139
140	char *start_buf;		/* rdma read src */
141	struct ibv_mr *start_mr;
142
143	enum test_state state;		/* used for cond/signalling */
144	sem_t sem;
145
146	struct sockaddr_storage sin;
147	uint16_t port;			/* dst port in NBO */
148	int verbose;			/* verbose logging */
149	int count;			/* ping count */
150	int size;			/* ping data size */
151	int validate;			/* validate ping data */
152
153	/* CM stuff */
154	pthread_t cmthread;
155	struct rdma_event_channel *cm_channel;
156	struct rdma_cm_id *cm_id;	/* connection on client side,*/
157					/* listener on service side. */
158	struct rdma_cm_id *child_cm_id;	/* connection on server side */
159};
160
161static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
162				    struct rdma_cm_event *event)
163{
164	int ret = 0;
165	struct rping_cb *cb = cma_id->context;
166
167	DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
168		  rdma_event_str(event->event), cma_id,
169		  (cma_id == cb->cm_id) ? "parent" : "child");
170
171	switch (event->event) {
172	case RDMA_CM_EVENT_ADDR_RESOLVED:
173		cb->state = ADDR_RESOLVED;
174		ret = rdma_resolve_route(cma_id, 2000);
175		if (ret) {
176			cb->state = ERROR;
177			perror("rdma_resolve_route");
178			sem_post(&cb->sem);
179		}
180		break;
181
182	case RDMA_CM_EVENT_ROUTE_RESOLVED:
183		cb->state = ROUTE_RESOLVED;
184		sem_post(&cb->sem);
185		break;
186
187	case RDMA_CM_EVENT_CONNECT_REQUEST:
188		cb->state = CONNECT_REQUEST;
189		cb->child_cm_id = cma_id;
190		DEBUG_LOG("child cma %p\n", cb->child_cm_id);
191		sem_post(&cb->sem);
192		break;
193
194	case RDMA_CM_EVENT_ESTABLISHED:
195		DEBUG_LOG("ESTABLISHED\n");
196
197		/*
198		 * Server will wake up when first RECV completes.
199		 */
200		if (!cb->server) {
201			cb->state = CONNECTED;
202		}
203		sem_post(&cb->sem);
204		break;
205
206	case RDMA_CM_EVENT_ADDR_ERROR:
207	case RDMA_CM_EVENT_ROUTE_ERROR:
208	case RDMA_CM_EVENT_CONNECT_ERROR:
209	case RDMA_CM_EVENT_UNREACHABLE:
210	case RDMA_CM_EVENT_REJECTED:
211		fprintf(stderr, "cma event %s, error %d\n",
212			rdma_event_str(event->event), event->status);
213		sem_post(&cb->sem);
214		ret = -1;
215		break;
216
217	case RDMA_CM_EVENT_DISCONNECTED:
218		fprintf(stderr, "%s DISCONNECT EVENT...\n",
219			cb->server ? "server" : "client");
220		sem_post(&cb->sem);
221		break;
222
223	case RDMA_CM_EVENT_DEVICE_REMOVAL:
224		fprintf(stderr, "cma detected device removal!!!!\n");
225		ret = -1;
226		break;
227
228	default:
229		fprintf(stderr, "unhandled event: %s, ignoring\n",
230			rdma_event_str(event->event));
231		break;
232	}
233
234	return ret;
235}
236
237static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
238{
239	if (wc->byte_len != sizeof(cb->recv_buf)) {
240		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
241		return -1;
242	}
243
244	cb->remote_rkey = ntohl(cb->recv_buf.rkey);
245	cb->remote_addr = ntohll(cb->recv_buf.buf);
246	cb->remote_len  = ntohl(cb->recv_buf.size);
247	DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
248		  cb->remote_rkey, cb->remote_addr, cb->remote_len);
249
250	if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
251		cb->state = RDMA_READ_ADV;
252	else
253		cb->state = RDMA_WRITE_ADV;
254
255	return 0;
256}
257
258static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
259{
260	if (wc->byte_len != sizeof(cb->recv_buf)) {
261		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
262		return -1;
263	}
264
265	if (cb->state == RDMA_READ_ADV)
266		cb->state = RDMA_WRITE_ADV;
267	else
268		cb->state = RDMA_WRITE_COMPLETE;
269
270	return 0;
271}
272
273static int rping_cq_event_handler(struct rping_cb *cb)
274{
275	struct ibv_wc wc;
276	struct ibv_recv_wr *bad_wr;
277	int ret;
278
279	while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
280		ret = 0;
281
282		if (wc.status) {
283			fprintf(stderr, "cq completion failed status %d\n",
284				wc.status);
285			if (wc.status != IBV_WC_WR_FLUSH_ERR)
286				ret = -1;
287			goto error;
288		}
289
290		switch (wc.opcode) {
291		case IBV_WC_SEND:
292			DEBUG_LOG("send completion\n");
293			break;
294
295		case IBV_WC_RDMA_WRITE:
296			DEBUG_LOG("rdma write completion\n");
297			cb->state = RDMA_WRITE_COMPLETE;
298			sem_post(&cb->sem);
299			break;
300
301		case IBV_WC_RDMA_READ:
302			DEBUG_LOG("rdma read completion\n");
303			cb->state = RDMA_READ_COMPLETE;
304			sem_post(&cb->sem);
305			break;
306
307		case IBV_WC_RECV:
308			DEBUG_LOG("recv completion\n");
309			ret = cb->server ? server_recv(cb, &wc) :
310					   client_recv(cb, &wc);
311			if (ret) {
312				fprintf(stderr, "recv wc error: %d\n", ret);
313				goto error;
314			}
315
316			ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
317			if (ret) {
318				fprintf(stderr, "post recv error: %d\n", ret);
319				goto error;
320			}
321			sem_post(&cb->sem);
322			break;
323
324		default:
325			DEBUG_LOG("unknown!!!!! completion\n");
326			ret = -1;
327			goto error;
328		}
329	}
330	if (ret) {
331		fprintf(stderr, "poll error %d\n", ret);
332		goto error;
333	}
334	return 0;
335
336error:
337	cb->state = ERROR;
338	sem_post(&cb->sem);
339	return ret;
340}
341
342static int rping_accept(struct rping_cb *cb)
343{
344	struct rdma_conn_param conn_param;
345	int ret;
346
347	DEBUG_LOG("accepting client connection request\n");
348
349	memset(&conn_param, 0, sizeof conn_param);
350	conn_param.responder_resources = 1;
351	conn_param.initiator_depth = 1;
352
353	ret = rdma_accept(cb->child_cm_id, &conn_param);
354	if (ret) {
355		perror("rdma_accept");
356		return ret;
357	}
358
359	sem_wait(&cb->sem);
360	if (cb->state == ERROR) {
361		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
362		return -1;
363	}
364	return 0;
365}
366
367static void rping_setup_wr(struct rping_cb *cb)
368{
369	cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
370	cb->recv_sgl.length = sizeof cb->recv_buf;
371	cb->recv_sgl.lkey = cb->recv_mr->lkey;
372	cb->rq_wr.sg_list = &cb->recv_sgl;
373	cb->rq_wr.num_sge = 1;
374
375	cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
376	cb->send_sgl.length = sizeof cb->send_buf;
377	cb->send_sgl.lkey = cb->send_mr->lkey;
378
379	cb->sq_wr.opcode = IBV_WR_SEND;
380	cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
381	cb->sq_wr.sg_list = &cb->send_sgl;
382	cb->sq_wr.num_sge = 1;
383
384	cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
385	cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
386	cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
387	cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
388	cb->rdma_sq_wr.num_sge = 1;
389}
390
391static int rping_setup_buffers(struct rping_cb *cb)
392{
393	int ret;
394
395	DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
396
397	cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
398				 IBV_ACCESS_LOCAL_WRITE);
399	if (!cb->recv_mr) {
400		fprintf(stderr, "recv_buf reg_mr failed\n");
401		return errno;
402	}
403
404	cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
405	if (!cb->send_mr) {
406		fprintf(stderr, "send_buf reg_mr failed\n");
407		ret = errno;
408		goto err1;
409	}
410
411	cb->rdma_buf = malloc(cb->size);
412	if (!cb->rdma_buf) {
413		fprintf(stderr, "rdma_buf malloc failed\n");
414		ret = -ENOMEM;
415		goto err2;
416	}
417
418	cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
419				 IBV_ACCESS_LOCAL_WRITE |
420				 IBV_ACCESS_REMOTE_READ |
421				 IBV_ACCESS_REMOTE_WRITE);
422	if (!cb->rdma_mr) {
423		fprintf(stderr, "rdma_buf reg_mr failed\n");
424		ret = errno;
425		goto err3;
426	}
427
428	if (!cb->server) {
429		cb->start_buf = malloc(cb->size);
430		if (!cb->start_buf) {
431			fprintf(stderr, "start_buf malloc failed\n");
432			ret = -ENOMEM;
433			goto err4;
434		}
435
436		cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
437					  IBV_ACCESS_LOCAL_WRITE |
438					  IBV_ACCESS_REMOTE_READ |
439					  IBV_ACCESS_REMOTE_WRITE);
440		if (!cb->start_mr) {
441			fprintf(stderr, "start_buf reg_mr failed\n");
442			ret = errno;
443			goto err5;
444		}
445	}
446
447	rping_setup_wr(cb);
448	DEBUG_LOG("allocated & registered buffers...\n");
449	return 0;
450
451err5:
452	free(cb->start_buf);
453err4:
454	ibv_dereg_mr(cb->rdma_mr);
455err3:
456	free(cb->rdma_buf);
457err2:
458	ibv_dereg_mr(cb->send_mr);
459err1:
460	ibv_dereg_mr(cb->recv_mr);
461	return ret;
462}
463
464static void rping_free_buffers(struct rping_cb *cb)
465{
466	DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
467	ibv_dereg_mr(cb->recv_mr);
468	ibv_dereg_mr(cb->send_mr);
469	ibv_dereg_mr(cb->rdma_mr);
470	free(cb->rdma_buf);
471	if (!cb->server) {
472		ibv_dereg_mr(cb->start_mr);
473		free(cb->start_buf);
474	}
475}
476
477static int rping_create_qp(struct rping_cb *cb)
478{
479	struct ibv_qp_init_attr init_attr;
480	int ret;
481
482	memset(&init_attr, 0, sizeof(init_attr));
483	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
484	init_attr.cap.max_recv_wr = 2;
485	init_attr.cap.max_recv_sge = 1;
486	init_attr.cap.max_send_sge = 1;
487	init_attr.qp_type = IBV_QPT_RC;
488	init_attr.send_cq = cb->cq;
489	init_attr.recv_cq = cb->cq;
490
491	if (cb->server) {
492		ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
493		if (!ret)
494			cb->qp = cb->child_cm_id->qp;
495	} else {
496		ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
497		if (!ret)
498			cb->qp = cb->cm_id->qp;
499	}
500
501	return ret;
502}
503
504static void rping_free_qp(struct rping_cb *cb)
505{
506	ibv_destroy_qp(cb->qp);
507	ibv_destroy_cq(cb->cq);
508	ibv_destroy_comp_channel(cb->channel);
509	ibv_dealloc_pd(cb->pd);
510}
511
512static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
513{
514	int ret;
515
516	cb->pd = ibv_alloc_pd(cm_id->verbs);
517	if (!cb->pd) {
518		fprintf(stderr, "ibv_alloc_pd failed\n");
519		return errno;
520	}
521	DEBUG_LOG("created pd %p\n", cb->pd);
522
523	cb->channel = ibv_create_comp_channel(cm_id->verbs);
524	if (!cb->channel) {
525		fprintf(stderr, "ibv_create_comp_channel failed\n");
526		ret = errno;
527		goto err1;
528	}
529	DEBUG_LOG("created channel %p\n", cb->channel);
530
531	cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
532				cb->channel, 0);
533	if (!cb->cq) {
534		fprintf(stderr, "ibv_create_cq failed\n");
535		ret = errno;
536		goto err2;
537	}
538	DEBUG_LOG("created cq %p\n", cb->cq);
539
540	ret = ibv_req_notify_cq(cb->cq, 0);
541	if (ret) {
542		fprintf(stderr, "ibv_create_cq failed\n");
543		ret = errno;
544		goto err3;
545	}
546
547	ret = rping_create_qp(cb);
548	if (ret) {
549		perror("rdma_create_qp");
550		goto err3;
551	}
552	DEBUG_LOG("created qp %p\n", cb->qp);
553	return 0;
554
555err3:
556	ibv_destroy_cq(cb->cq);
557err2:
558	ibv_destroy_comp_channel(cb->channel);
559err1:
560	ibv_dealloc_pd(cb->pd);
561	return ret;
562}
563
564static void *cm_thread(void *arg)
565{
566	struct rping_cb *cb = arg;
567	struct rdma_cm_event *event;
568	int ret;
569
570	while (1) {
571		ret = rdma_get_cm_event(cb->cm_channel, &event);
572		if (ret) {
573			perror("rdma_get_cm_event");
574			exit(ret);
575		}
576		ret = rping_cma_event_handler(event->id, event);
577		rdma_ack_cm_event(event);
578		if (ret)
579			exit(ret);
580	}
581}
582
583static void *cq_thread(void *arg)
584{
585	struct rping_cb *cb = arg;
586	struct ibv_cq *ev_cq;
587	void *ev_ctx;
588	int ret;
589
590	DEBUG_LOG("cq_thread started.\n");
591
592	while (1) {
593		pthread_testcancel();
594
595		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
596		if (ret) {
597			fprintf(stderr, "Failed to get cq event!\n");
598			pthread_exit(NULL);
599		}
600		if (ev_cq != cb->cq) {
601			fprintf(stderr, "Unknown CQ!\n");
602			pthread_exit(NULL);
603		}
604		ret = ibv_req_notify_cq(cb->cq, 0);
605		if (ret) {
606			fprintf(stderr, "Failed to set notify!\n");
607			pthread_exit(NULL);
608		}
609		ret = rping_cq_event_handler(cb);
610		ibv_ack_cq_events(cb->cq, 1);
611		if (ret)
612			pthread_exit(NULL);
613	}
614}
615
616static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
617{
618	struct rping_rdma_info *info = &cb->send_buf;
619
620	info->buf = htonll((uint64_t) (unsigned long) buf);
621	info->rkey = htonl(mr->rkey);
622	info->size = htonl(cb->size);
623
624	DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
625		  ntohll(info->buf), ntohl(info->rkey), ntohl(info->size));
626}
627
628static int rping_test_server(struct rping_cb *cb)
629{
630	struct ibv_send_wr *bad_wr;
631	int ret;
632
633	while (1) {
634		/* Wait for client's Start STAG/TO/Len */
635		sem_wait(&cb->sem);
636		if (cb->state != RDMA_READ_ADV) {
637			fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
638				cb->state);
639			ret = -1;
640			break;
641		}
642
643		DEBUG_LOG("server received sink adv\n");
644
645		/* Issue RDMA Read. */
646		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
647		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
648		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
649		cb->rdma_sq_wr.sg_list->length = cb->remote_len;
650
651		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
652		if (ret) {
653			fprintf(stderr, "post send error %d\n", ret);
654			break;
655		}
656		DEBUG_LOG("server posted rdma read req \n");
657
658		/* Wait for read completion */
659		sem_wait(&cb->sem);
660		if (cb->state != RDMA_READ_COMPLETE) {
661			fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
662				cb->state);
663			ret = -1;
664			break;
665		}
666		DEBUG_LOG("server received read complete\n");
667
668		/* Display data in recv buf */
669		if (cb->verbose)
670			printf("server ping data: %s\n", cb->rdma_buf);
671
672		/* Tell client to continue */
673		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
674		if (ret) {
675			fprintf(stderr, "post send error %d\n", ret);
676			break;
677		}
678		DEBUG_LOG("server posted go ahead\n");
679
680		/* Wait for client's RDMA STAG/TO/Len */
681		sem_wait(&cb->sem);
682		if (cb->state != RDMA_WRITE_ADV) {
683			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
684				cb->state);
685			ret = -1;
686			break;
687		}
688		DEBUG_LOG("server received sink adv\n");
689
690		/* RDMA Write echo data */
691		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
692		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
693		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
694		cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
695		DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
696			  cb->rdma_sq_wr.sg_list->lkey,
697			  cb->rdma_sq_wr.sg_list->addr,
698			  cb->rdma_sq_wr.sg_list->length);
699
700		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
701		if (ret) {
702			fprintf(stderr, "post send error %d\n", ret);
703			break;
704		}
705
706		/* Wait for completion */
707		ret = sem_wait(&cb->sem);
708		if (cb->state != RDMA_WRITE_COMPLETE) {
709			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
710				cb->state);
711			ret = -1;
712			break;
713		}
714		DEBUG_LOG("server rdma write complete \n");
715
716		/* Tell client to begin again */
717		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
718		if (ret) {
719			fprintf(stderr, "post send error %d\n", ret);
720			break;
721		}
722		DEBUG_LOG("server posted go ahead\n");
723	}
724
725	return ret;
726}
727
728static int rping_bind_server(struct rping_cb *cb)
729{
730	int ret;
731
732	if (cb->sin.ss_family == AF_INET)
733		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
734	else
735		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
736
737	ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
738	if (ret) {
739		perror("rdma_bind_addr");
740		return ret;
741	}
742	DEBUG_LOG("rdma_bind_addr successful\n");
743
744	DEBUG_LOG("rdma_listen\n");
745	ret = rdma_listen(cb->cm_id, 3);
746	if (ret) {
747		perror("rdma_listen");
748		return ret;
749	}
750
751	return 0;
752}
753
754static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
755{
756	struct rping_cb *cb = malloc(sizeof *cb);
757	if (!cb)
758		return NULL;
759	*cb = *listening_cb;
760	cb->child_cm_id->context = cb;
761	return cb;
762}
763
764static void free_cb(struct rping_cb *cb)
765{
766	free(cb);
767}
768
769static void *rping_persistent_server_thread(void *arg)
770{
771	struct rping_cb *cb = arg;
772	struct ibv_recv_wr *bad_wr;
773	int ret;
774
775	ret = rping_setup_qp(cb, cb->child_cm_id);
776	if (ret) {
777		fprintf(stderr, "setup_qp failed: %d\n", ret);
778		goto err0;
779	}
780
781	ret = rping_setup_buffers(cb);
782	if (ret) {
783		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
784		goto err1;
785	}
786
787	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
788	if (ret) {
789		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
790		goto err2;
791	}
792
793	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
794
795	ret = rping_accept(cb);
796	if (ret) {
797		fprintf(stderr, "connect error %d\n", ret);
798		goto err3;
799	}
800
801	rping_test_server(cb);
802	rdma_disconnect(cb->child_cm_id);
803	rping_free_buffers(cb);
804	rping_free_qp(cb);
805	pthread_cancel(cb->cqthread);
806	pthread_join(cb->cqthread, NULL);
807	rdma_destroy_id(cb->child_cm_id);
808	free_cb(cb);
809	return NULL;
810err3:
811	pthread_cancel(cb->cqthread);
812	pthread_join(cb->cqthread, NULL);
813err2:
814	rping_free_buffers(cb);
815err1:
816	rping_free_qp(cb);
817err0:
818	free_cb(cb);
819	return NULL;
820}
821
822static int rping_run_persistent_server(struct rping_cb *listening_cb)
823{
824	int ret;
825	struct rping_cb *cb;
826
827	ret = rping_bind_server(listening_cb);
828	if (ret)
829		return ret;
830
831	while (1) {
832		sem_wait(&listening_cb->sem);
833		if (listening_cb->state != CONNECT_REQUEST) {
834			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
835				listening_cb->state);
836			return -1;
837		}
838
839		cb = clone_cb(listening_cb);
840		if (!cb)
841			return -1;
842		pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
843	}
844	return 0;
845}
846
847static int rping_run_server(struct rping_cb *cb)
848{
849	struct ibv_recv_wr *bad_wr;
850	int ret;
851
852	ret = rping_bind_server(cb);
853	if (ret)
854		return ret;
855
856	sem_wait(&cb->sem);
857	if (cb->state != CONNECT_REQUEST) {
858		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
859			cb->state);
860		return -1;
861	}
862
863	ret = rping_setup_qp(cb, cb->child_cm_id);
864	if (ret) {
865		fprintf(stderr, "setup_qp failed: %d\n", ret);
866		return ret;
867	}
868
869	ret = rping_setup_buffers(cb);
870	if (ret) {
871		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
872		goto err1;
873	}
874
875	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
876	if (ret) {
877		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
878		goto err2;
879	}
880
881	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
882
883	ret = rping_accept(cb);
884	if (ret) {
885		fprintf(stderr, "connect error %d\n", ret);
886		goto err2;
887	}
888
889	rping_test_server(cb);
890	rdma_disconnect(cb->child_cm_id);
891	rdma_destroy_id(cb->child_cm_id);
892err2:
893	rping_free_buffers(cb);
894err1:
895	rping_free_qp(cb);
896
897	return ret;
898}
899
900static int rping_test_client(struct rping_cb *cb)
901{
902	int ping, start, cc, i, ret = 0;
903	struct ibv_send_wr *bad_wr;
904	unsigned char c;
905
906	start = 65;
907	for (ping = 0; !cb->count || ping < cb->count; ping++) {
908		cb->state = RDMA_READ_ADV;
909
910		/* Put some ascii text in the buffer. */
911		cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping);
912		for (i = cc, c = start; i < cb->size; i++) {
913			cb->start_buf[i] = c;
914			c++;
915			if (c > 122)
916				c = 65;
917		}
918		start++;
919		if (start > 122)
920			start = 65;
921		cb->start_buf[cb->size - 1] = 0;
922
923		rping_format_send(cb, cb->start_buf, cb->start_mr);
924		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
925		if (ret) {
926			fprintf(stderr, "post send error %d\n", ret);
927			break;
928		}
929
930		/* Wait for server to ACK */
931		sem_wait(&cb->sem);
932		if (cb->state != RDMA_WRITE_ADV) {
933			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
934				cb->state);
935			ret = -1;
936			break;
937		}
938
939		rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
940		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
941		if (ret) {
942			fprintf(stderr, "post send error %d\n", ret);
943			break;
944		}
945
946		/* Wait for the server to say the RDMA Write is complete. */
947		sem_wait(&cb->sem);
948		if (cb->state != RDMA_WRITE_COMPLETE) {
949			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
950				cb->state);
951			ret = -1;
952			break;
953		}
954
955		if (cb->validate)
956			if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
957				fprintf(stderr, "data mismatch!\n");
958				ret = -1;
959				break;
960			}
961
962		if (cb->verbose)
963			printf("ping data: %s\n", cb->rdma_buf);
964	}
965
966	return ret;
967}
968
969static int rping_connect_client(struct rping_cb *cb)
970{
971	struct rdma_conn_param conn_param;
972	int ret;
973
974	memset(&conn_param, 0, sizeof conn_param);
975	conn_param.responder_resources = 1;
976	conn_param.initiator_depth = 1;
977	conn_param.retry_count = 10;
978
979	ret = rdma_connect(cb->cm_id, &conn_param);
980	if (ret) {
981		perror("rdma_connect");
982		return ret;
983	}
984
985	sem_wait(&cb->sem);
986	if (cb->state != CONNECTED) {
987		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
988		return -1;
989	}
990
991	DEBUG_LOG("rmda_connect successful\n");
992	return 0;
993}
994
995static int rping_bind_client(struct rping_cb *cb)
996{
997	int ret;
998
999	if (cb->sin.ss_family == AF_INET)
1000		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1001	else
1002		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1003
1004	ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1005	if (ret) {
1006		perror("rdma_resolve_addr");
1007		return ret;
1008	}
1009
1010	sem_wait(&cb->sem);
1011	if (cb->state != ROUTE_RESOLVED) {
1012		fprintf(stderr, "waiting for addr/route resolution state %d\n",
1013			cb->state);
1014		return -1;
1015	}
1016
1017	DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1018	return 0;
1019}
1020
1021static int rping_run_client(struct rping_cb *cb)
1022{
1023	struct ibv_recv_wr *bad_wr;
1024	int ret;
1025
1026	ret = rping_bind_client(cb);
1027	if (ret)
1028		return ret;
1029
1030	ret = rping_setup_qp(cb, cb->cm_id);
1031	if (ret) {
1032		fprintf(stderr, "setup_qp failed: %d\n", ret);
1033		return ret;
1034	}
1035
1036	ret = rping_setup_buffers(cb);
1037	if (ret) {
1038		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1039		goto err1;
1040	}
1041
1042	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1043	if (ret) {
1044		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1045		goto err2;
1046	}
1047
1048	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1049
1050	ret = rping_connect_client(cb);
1051	if (ret) {
1052		fprintf(stderr, "connect error %d\n", ret);
1053		goto err2;
1054	}
1055
1056	rping_test_client(cb);
1057	rdma_disconnect(cb->cm_id);
1058err2:
1059	rping_free_buffers(cb);
1060err1:
1061	rping_free_qp(cb);
1062
1063	return ret;
1064}
1065
1066static int get_addr(char *dst, struct sockaddr *addr)
1067{
1068	struct addrinfo *res;
1069	int ret;
1070
1071	ret = getaddrinfo(dst, NULL, NULL, &res);
1072	if (ret) {
1073		printf("getaddrinfo failed - invalid hostname or IP address\n");
1074		return ret;
1075	}
1076
1077	if (res->ai_family == PF_INET)
1078		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1079	else if (res->ai_family == PF_INET6)
1080		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1081	else
1082		ret = -1;
1083
1084	freeaddrinfo(res);
1085	return ret;
1086}
1087
1088static void usage(char *name)
1089{
1090	printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n",
1091	       name);
1092	printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n",
1093	       name);
1094	printf("\t-c\t\tclient side\n");
1095	printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1096	printf("\t-v\t\tdisplay ping data to stdout\n");
1097	printf("\t-V\t\tvalidate ping data\n");
1098	printf("\t-d\t\tdebug printfs\n");
1099	printf("\t-S size \tping data size\n");
1100	printf("\t-C count\tping count times\n");
1101	printf("\t-a addr\t\taddress\n");
1102	printf("\t-p port\t\tport\n");
1103	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1104}
1105
1106int main(int argc, char *argv[])
1107{
1108	struct rping_cb *cb;
1109	int op;
1110	int ret = 0;
1111	int persistent_server = 0;
1112
1113	cb = malloc(sizeof(*cb));
1114	if (!cb)
1115		return -ENOMEM;
1116
1117	memset(cb, 0, sizeof(*cb));
1118	cb->server = -1;
1119	cb->state = IDLE;
1120	cb->size = 64;
1121	cb->sin.ss_family = PF_INET;
1122	cb->port = htons(7174);
1123	sem_init(&cb->sem, 0, 0);
1124
1125	opterr = 0;
1126	while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
1127		switch (op) {
1128		case 'a':
1129			ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1130			break;
1131		case 'P':
1132			persistent_server = 1;
1133			break;
1134		case 'p':
1135			cb->port = htons(atoi(optarg));
1136			DEBUG_LOG("port %d\n", (int) atoi(optarg));
1137			break;
1138		case 's':
1139			cb->server = 1;
1140			DEBUG_LOG("server\n");
1141			break;
1142		case 'c':
1143			cb->server = 0;
1144			DEBUG_LOG("client\n");
1145			break;
1146		case 'S':
1147			cb->size = atoi(optarg);
1148			if ((cb->size < RPING_MIN_BUFSIZE) ||
1149			    (cb->size > (RPING_BUFSIZE - 1))) {
1150				fprintf(stderr, "Invalid size %d "
1151				       "(valid range is %Zd to %d)\n",
1152				       cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE);
1153				ret = EINVAL;
1154			} else
1155				DEBUG_LOG("size %d\n", (int) atoi(optarg));
1156			break;
1157		case 'C':
1158			cb->count = atoi(optarg);
1159			if (cb->count < 0) {
1160				fprintf(stderr, "Invalid count %d\n",
1161					cb->count);
1162				ret = EINVAL;
1163			} else
1164				DEBUG_LOG("count %d\n", (int) cb->count);
1165			break;
1166		case 'v':
1167			cb->verbose++;
1168			DEBUG_LOG("verbose\n");
1169			break;
1170		case 'V':
1171			cb->validate++;
1172			DEBUG_LOG("validate data\n");
1173			break;
1174		case 'd':
1175			debug++;
1176			break;
1177		default:
1178			usage("rping");
1179			ret = EINVAL;
1180			goto out;
1181		}
1182	}
1183	if (ret)
1184		goto out;
1185
1186	if (cb->server == -1) {
1187		usage("rping");
1188		ret = EINVAL;
1189		goto out;
1190	}
1191
1192	cb->cm_channel = rdma_create_event_channel();
1193	if (!cb->cm_channel) {
1194		perror("rdma_create_event_channel");
1195		goto out;
1196	}
1197
1198	ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1199	if (ret) {
1200		perror("rdma_create_id");
1201		goto out2;
1202	}
1203	DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1204
1205	pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1206
1207	if (cb->server) {
1208		if (persistent_server)
1209			ret = rping_run_persistent_server(cb);
1210		else
1211			ret = rping_run_server(cb);
1212	} else
1213		ret = rping_run_client(cb);
1214
1215	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1216	rdma_destroy_id(cb->cm_id);
1217out2:
1218	rdma_destroy_event_channel(cb->cm_channel);
1219out:
1220	free(cb);
1221	return ret;
1222}
1223