8.2 发送、接收、RDMA 读取、RDMA 写入的代码
示例代码
RDMA_RC_example.c (show/hide)
1/* 2* BUILD COMMAND: 3* gcc -Wall -I/usr/local/ofed/include -O2 -o RDMA_RC_example -L/usr/local/ 4ofed/lib64 -L/usr/local/ofed/lib -libverbs RDMA_RC_example.c 5* 6* Usage: 7* RDMA_RC_example --help 8*/ 9/****************************************************************************** 10* 11* RDMA Aware Networks Programming Example 12* 13* This code demonstrates how to perform the following operations using the 14* VPI Verbs API: 15* 16* Send 17* Receive 18* RDMA Read 19* RDMA Write 20* 21*****************************************************************************/ 22#include <arpa/inet.h> 23#include <byteswap.h> 24#include <endian.h> 25#include <getopt.h> 26#include <infiniband/verbs.h> 27#include <inttypes.h> 28#include <netdb.h> 29#include <stdint.h> 30#include <stdio.h> 31#include <stdlib.h> 32#include <string.h> 33#include <sys/socket.h> 34#include <sys/time.h> 35#include <sys/types.h> 36#include <unistd.h> 37/* poll CQ timeout in millisec (2 seconds) */ 38#define MAX_POLL_CQ_TIMEOUT 2000 39#define MSG "SEND operation " 40#define RDMAMSGR "RDMA read operation " 41#define RDMAMSGW "RDMA write operation" 42#define MSG_SIZE (strlen(MSG) + 1) 43#if __BYTE_ORDER == __LITTLE_ENDIAN 44static inline uint64_t htonll(uint64_t x) { return bswap_64(x); } 45static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); } 46#elif __BYTE_ORDER == __BIG_ENDIAN 47static inline uint64_t htonll(uint64_t x) { return x; } 48static inline uint64_t ntohll(uint64_t x) { return x; } 49#else 50#error __BYTE_ORDER is neither __LITTLE_ENDIAN nor __BIG_ENDIAN 51#endif 52 53/* structure of test parameters */ 54struct config_t 55{ 56 const char *dev_name; /* IB device name */ 57 char *server_name; /* server host name */ 58 u_int32_t tcp_port; /* server TCP port */ 59 int ib_port; /* local IB port to work with */ 60 int gid_idx; /* gid index to use */ 61}; 62 63/* structure to exchange data which is needed to connect the QPs */ 64struct cm_con_data_t 65{ 66 uint64_t addr; /* Buffer address */ 67 uint32_t rkey; /* Remote key */ 68 uint32_t qp_num; /* QP number */ 69 uint16_t lid; /* LID of the IB port */ 70 uint8_t gid[16]; /* gid */ 71} __attribute__((packed)); 72 73/* structure of system resources */ 74struct resources 75{ 76 struct ibv_device_attr device_attr; 77 /* Device attributes */ 78 struct ibv_port_attr port_attr; /* IB port attributes */ 79 struct cm_con_data_t remote_props; /* values to connect to remote side */ 80 struct ibv_context *ib_ctx; /* device handle */ 81 struct ibv_pd *pd; /* PD handle */ 82 struct ibv_cq *cq; /* CQ handle */ 83 struct ibv_qp *qp; /* QP handle */ 84 struct ibv_mr *mr; /* MR handle for buf */ 85 char *buf; /* memory buffer pointer, used for RDMA and send ops */ 86 int sock; /* TCP socket file descriptor */ 87}; 88 89struct config_t config = { 90 NULL, /* dev_name */ 91 NULL, /* server_name */ 92 19875, /* tcp_port */ 93 1, /* ib_port */ 94 -1 /* gid_idx */ 95}; 96 97/****************************************************************************** 98Socket operations 99For simplicity, the example program uses TCP sockets to exchange control 100information. If a TCP/IP stack/connection is not available, connection manager 101(CM) may be used to pass this information. Use of CM is beyond the scope of 102this example 103******************************************************************************/ 104 105/****************************************************************************** 106 * Function: sock_connect 107 * 108 * Input 109 * servername URL of server to connect to (NULL for server mode) 110 * port port of service 111 * 112 * Output 113 * none 114 * 115 * Returns 116 * socket (fd) on success, negative error code on failure 117 * 118 * Description 119 * Connect a socket. If servername is specified a client connection will be 120 * initiated to the indicated server and port. Otherwise listen on the 121 * indicated port for an incoming connection. 122 * 123 ******************************************************************************/ 124 125static int sock_connect(const char *servername, int port) 126{ 127 // `struct addrinfo' is not part of standard C, but belongs to gnu. 128 // https://stackoverflow.com/questions/33076175/why-is-struct-addrinfo-defined-only-if-use-xopen2k-is-defined 129 struct addrinfo *resolved_addr = NULL; 130 struct addrinfo *iterator; 131 char service[6]; 132 int sockfd = -1; 133 int listenfd = 0; 134 int tmp; 135 136 struct addrinfo hints = { 137 .ai_flags = AI_PASSIVE, .ai_family = AF_INET, .ai_socktype = SOCK_STREAM}; 138 if (sprintf(service, "%d", port) < 0) 139 goto sock_connect_exit; 140 /* Resolve DNS address, use sockfd as temp storage */ 141 sockfd = getaddrinfo(servername, service, &hints, &resolved_addr); 142 if (sockfd < 0) 143 { 144 fprintf(stderr, "%s for %s:%d\n", gai_strerror(sockfd), servername, port); 145 goto sock_connect_exit; 146 } 147 /* Search through results and find the one we want */ 148 for (iterator = resolved_addr; iterator; iterator = iterator->ai_next) 149 { 150 sockfd = socket(iterator->ai_family, iterator->ai_socktype, 151 iterator->ai_protocol); 152 if (sockfd >= 0) 153 { 154 if (servername) 155 { 156 /* Client mode. Initiate connection to remote */ 157 if ((tmp = connect(sockfd, iterator->ai_addr, iterator->ai_addrlen))) 158 { 159 fprintf(stdout, "failed connect \n"); 160 close(sockfd); 161 sockfd = -1; 162 } 163 } 164 else 165 { 166 /* Server mode. Set up listening socket an accept a connection 167 */ 168 listenfd = sockfd; 169 sockfd = -1; 170 if (bind(listenfd, iterator->ai_addr, iterator->ai_addrlen)) 171 goto sock_connect_exit; 172 listen(listenfd, 1); 173 sockfd = accept(listenfd, NULL, 0); 174 } 175 } 176 } 177sock_connect_exit: 178 if (listenfd) 179 close(listenfd); 180 if (resolved_addr) 181 freeaddrinfo(resolved_addr); 182 if (sockfd < 0) 183 { 184 if (servername) 185 fprintf(stderr, "Couldn't connect to %s:%d\n", servername, port); 186 else 187 { 188 perror("server accept"); 189 fprintf(stderr, "accept() failed\n"); 190 } 191 } 192 return sockfd; 193} 194 195/****************************************************************************** 196* Function: sock_sync_data 197* 198* Input 199* sock socket to transfer data on 200* xfer_size size of data to transfer 201* local_data pointer to data to be sent to remote 202* 203* Output 204* remote_data pointer to buffer to receive remote data 205* 206* Returns 207* 0 on success, negative error code on failure 208* 209* Description 210* Sync data across a socket. The indicated local data will be sent to the 211* remote. It will then wait for the remote to send its data back. It is 212* assumed that the two sides are in sync and call this function in the proper 213* order. Chaos will ensue if they are not. :) 214* 215* Also note this is a blocking function and will wait for the full data to be 216* received from the remote. 217* 218******************************************************************************/ 219int sock_sync_data(int sock, int xfer_size, char *local_data, 220 char *remote_data) 221{ 222 int rc; 223 int read_bytes = 0; 224 int total_read_bytes = 0; 225 rc = write(sock, local_data, xfer_size); 226 if (rc < xfer_size) 227 fprintf(stderr, "Failed writing data during sock_sync_data\n"); 228 else 229 rc = 0; 230 while (!rc && total_read_bytes < xfer_size) 231 { 232 read_bytes = read(sock, remote_data, xfer_size); 233 if (read_bytes > 0) 234 total_read_bytes += read_bytes; 235 else 236 rc = read_bytes; 237 } 238 return rc; 239} 240 241/****************************************************************************** 242End of socket operations 243******************************************************************************/ 244 245/* poll_completion */ 246/****************************************************************************** 247 * Function: poll_completion 248 * 249 * Input 250 * res pointer to resources structure 251 * 252 * Output 253 * none 254 * 255 * Returns 256 * 0 on success, 1 on failure 257 * 258 * Description 259 * Poll the completion queue for a single event. This function will continue to 260 * poll the queue until MAX_POLL_CQ_TIMEOUT milliseconds have passed. 261 * 262 ******************************************************************************/ 263static int poll_completion(struct resources *res) 264{ 265 struct ibv_wc wc; 266 unsigned long start_time_msec; 267 unsigned long cur_time_msec; 268 struct timeval cur_time; 269 int poll_result; 270 int rc = 0; 271 /* poll the completion for a while before giving up of doing it .. */ 272 gettimeofday(&cur_time, NULL); 273 start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000); 274 do 275 { 276 poll_result = ibv_poll_cq(res->cq, 1, &wc); 277 gettimeofday(&cur_time, NULL); 278 cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000); 279 } while ((poll_result == 0) && 280 ((cur_time_msec - start_time_msec) < MAX_POLL_CQ_TIMEOUT)); 281 if (poll_result < 0) 282 { 283 /* poll CQ failed */ 284 fprintf(stderr, "poll CQ failed\n"); 285 rc = 1; 286 } 287 else if (poll_result == 0) 288 { 289 /* the CQ is empty */ 290 fprintf(stderr, "completion wasn't found in the CQ after timeout\n"); 291 rc = 1; 292 } 293 else 294 { 295 /* CQE found */ 296 fprintf(stdout, "completion was found in CQ with status 0x%x\n", wc.status); 297 /* check the completion status (here we don't care about the completion 298 opcode */ 299 if (wc.status != IBV_WC_SUCCESS) 300 { 301 fprintf(stderr, "got bad completion with status: 0x%x, vendor syndrome: 0x%x\n", wc.status, wc.vendor_err); 302 rc = 1; 303 } 304 } 305 return rc; 306} 307 308/****************************************************************************** 309 * Function: post_send 310 * 311 * Input 312 * res pointer to resources structure 313 * opcode IBV_WR_SEND, IBV_WR_RDMA_READ or IBV_WR_RDMA_WRITE 314 * 315 * Output 316 * none 317 * 318 * Returns 319 * 0 on success, error code on failure 320 * 321 * Description 322 * This function will create and post a send work request 323 ******************************************************************************/ 324static int post_send(struct resources *res, int opcode) 325{ 326 struct ibv_send_wr sr; 327 struct ibv_sge sge; 328 struct ibv_send_wr *bad_wr = NULL; 329 int rc; 330 331 /* prepare the scatter/gather entry */ 332 memset(&sge, 0, sizeof(sge)); 333 sge.addr = (uintptr_t)res->buf; 334 sge.length = MSG_SIZE; 335 sge.lkey = res->mr->lkey; 336 /* prepare the send work request */ 337 memset(&sr, 0, sizeof(sr)); 338 sr.next = NULL; 339 sr.wr_id = 0; 340 sr.sg_list = &sge; 341 sr.num_sge = 1; 342 sr.opcode = opcode; 343 sr.send_flags = IBV_SEND_SIGNALED; 344 if (opcode != IBV_WR_SEND) 345 { 346 sr.wr.rdma.remote_addr = res->remote_props.addr; 347 sr.wr.rdma.rkey = res->remote_props.rkey; 348 } 349 /* there is a Receive Request in the responder side, so we won't get any 350 into RNR flow */ 351 rc = ibv_post_send(res->qp, &sr, &bad_wr); 352 if (rc) 353 fprintf(stderr, "failed to post SR\n"); 354 else 355 { 356 switch (opcode) 357 { 358 case IBV_WR_SEND: 359 fprintf(stdout, "Send Request was posted\n"); 360 break; 361 case IBV_WR_RDMA_READ: 362 fprintf(stdout, "RDMA Read Request was posted\n"); 363 break; 364 case IBV_WR_RDMA_WRITE: 365 fprintf(stdout, "RDMA Write Request was posted\n"); 366 break; 367 default: 368 fprintf(stdout, "Unknown Request was posted\n"); 369 break; 370 } 371 } 372 return rc; 373} 374 375/****************************************************************************** 376 * Function: post_receive 377 * 378 * Input 379 * res pointer to resources structure 380 * 381 * Output 382 * none 383 * 384 * Returns 385 * 0 on success, error code on failure 386 * 387 * Description 388 * 389 ******************************************************************************/ 390static int post_receive(struct resources *res) 391{ 392 struct ibv_recv_wr rr; 393 struct ibv_sge sge; 394 struct ibv_recv_wr *bad_wr; 395 int rc; 396 /* prepare the scatter/gather entry */ 397 memset(&sge, 0, sizeof(sge)); 398 sge.addr = (uintptr_t)res->buf; 399 sge.length = MSG_SIZE; 400 sge.lkey = res->mr->lkey; 401 /* prepare the receive work request */ 402 memset(&rr, 0, sizeof(rr)); 403 rr.next = NULL; 404 rr.wr_id = 0; 405 rr.sg_list = &sge; 406 rr.num_sge = 1; 407 /* post the Receive Request to the RQ */ 408 rc = ibv_post_recv(res->qp, &rr, &bad_wr); 409 if (rc) 410 fprintf(stderr, "failed to post RR\n"); 411 else 412 fprintf(stdout, "Receive Request was posted\n"); 413 return rc; 414} 415 416/****************************************************************************** 417 * Function: resources_init 418 * 419 * Input 420 * res pointer to resources structure 421 * 422 * Output 423 * res is initialized 424 * 425 * Returns 426 * none 427 * 428 * Description 429 * res is initialized to default values 430 ******************************************************************************/ 431static void resources_init(struct resources *res) 432{ 433 memset(res, 0, sizeof *res); 434 res->sock = -1; 435} 436 437/****************************************************************************** 438 * Function: resources_create 439 * 440 * Input 441 * res pointer to resources structure to be filled in 442 * 443 * Output 444 * res filled in with resources 445 * 446 * Returns 447 * 0 on success, 1 on failure 448 * 449 * Description 450 * 451 * This function creates and allocates all necessary system resources. These 452 * are stored in res. 453 *****************************************************************************/ 454static int resources_create(struct resources *res) 455{ 456 struct ibv_device **dev_list = NULL; 457 struct ibv_qp_init_attr qp_init_attr; 458 struct ibv_device *ib_dev = NULL; 459 size_t size; 460 int i; 461 int mr_flags = 0; 462 int cq_size = 0; 463 int num_devices; 464 int rc = 0; 465 466 /* if client side */ 467 if (config.server_name) 468 { 469 res->sock = sock_connect(config.server_name, config.tcp_port); 470 if (res->sock < 0) 471 { 472 fprintf(stderr, 473 "failed to establish TCP connection to server %s, port %d\n ", 474 config.server_name, 475 config.tcp_port); 476 rc = -1; 477 goto resources_create_exit; 478 } 479 } 480 else 481 { 482 fprintf(stdout, "waiting on port %d for TCP connection\n", config.tcp_port); 483 res->sock = sock_connect(NULL, config.tcp_port); 484 if (res->sock < 0) 485 { 486 fprintf(stderr, "failed to establish TCP connection with client on port %d\n ", config.tcp_port); 487 rc = -1; 488 goto resources_create_exit; 489 } 490 } 491 fprintf(stdout, "TCP connection was established\n"); 492 fprintf(stdout, "searching for IB devices in host\n"); 493 /* get device names in the system */ 494 dev_list = ibv_get_device_list(&num_devices); 495 if (!dev_list) 496 { 497 fprintf(stderr, "failed to get IB devices list\n"); 498 rc = 1; 499 goto resources_create_exit; 500 } 501 /* if there isn't any IB device in host */ 502 if (!num_devices) 503 { 504 fprintf(stderr, "found %d device(s)\n", num_devices); 505 rc = 1; 506 goto resources_create_exit; 507 } 508 fprintf(stdout, "found %d device(s)\n", num_devices); 509 /* search for the specific device we want to work with */ 510 for (i = 0; i < num_devices; i++) 511 { 512 if (!config.dev_name) 513 { 514 config.dev_name = strdup(ibv_get_device_name(dev_list[i])); 515 fprintf(stdout, "device not specified, using first one found: %s\n", config.dev_name); 516 } 517 if (!strcmp(ibv_get_device_name(dev_list[i]), config.dev_name)) 518 { 519 ib_dev = dev_list[i]; 520 break; 521 } 522 } 523 /* if the device wasn't found in host */ 524 if (!ib_dev) 525 { 526 fprintf(stderr, "IB device %s wasn't found\n", config.dev_name); 527 rc = 1; 528 goto resources_create_exit; 529 } 530 /* get device handle */ 531 res->ib_ctx = ibv_open_device(ib_dev); 532 if (!res->ib_ctx) 533 { 534 fprintf(stderr, "failed to open device %s\n", config.dev_name); 535 rc = 1; 536 goto resources_create_exit; 537 } 538 /* We are now done with device list, free it */ 539 ibv_free_device_list(dev_list); 540 dev_list = NULL; 541 ib_dev = NULL; 542 /* query port properties */ 543 if (ibv_query_port(res->ib_ctx, config.ib_port, &res->port_attr)) 544 { 545 fprintf(stderr, "ibv_query_port on port %u failed\n", config.ib_port); 546 rc = 1; 547 goto resources_create_exit; 548 } 549 /* allocate Protection Domain */ 550 res->pd = ibv_alloc_pd(res->ib_ctx); 551 if (!res->pd) 552 { 553 fprintf(stderr, "ibv_alloc_pd failed\n"); 554 rc = 1; 555 goto resources_create_exit; 556 } 557 /* each side will send only one WR, so Completion Queue with 1 entry is 558 enough */ 559 cq_size = 1; 560 res->cq = ibv_create_cq(res->ib_ctx, cq_size, NULL, NULL, 0); 561 if (!res->cq) 562 { 563 fprintf(stderr, "failed to create CQ with %u entries\n", cq_size); 564 rc = 1; 565 goto resources_create_exit; 566 } 567 /* allocate the memory buffer that will hold the data */ 568 size = MSG_SIZE; 569 res->buf = (char *)malloc(size); 570 if (!res->buf) 571 { 572 fprintf(stderr, "failed to malloc %zu bytes to memory buffer\n", size); 573 rc = 1; 574 goto resources_create_exit; 575 } 576 memset(res->buf, 0, size); 577 /* only in the server side put the message in the memory buffer */ 578 if (!config.server_name) 579 { 580 strcpy(res->buf, MSG); 581 fprintf(stdout, "going to send the message: '%s'\n", res->buf); 582 } 583 else 584 memset(res->buf, 0, size); 585 /* register the memory buffer */ 586 mr_flags = 587 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; 588 res->mr = ibv_reg_mr(res->pd, res->buf, size, mr_flags); 589 if (!res->mr) 590 { 591 fprintf(stderr, "ibv_reg_mr failed with mr_flags=0x%x\n", mr_flags); 592 rc = 1; 593 goto resources_create_exit; 594 } 595 fprintf(stdout, 596 "MR was registered with addr=%p, lkey=0x%x, rkey=0x%x, flags=0x%x\n", 597 res->buf, res->mr->lkey, res->mr->rkey, mr_flags); 598 /* create the Queue Pair */ 599 memset(&qp_init_attr, 0, sizeof(qp_init_attr)); 600 qp_init_attr.qp_type = IBV_QPT_RC; 601 qp_init_attr.sq_sig_all = 1; 602 qp_init_attr.send_cq = res->cq; 603 qp_init_attr.recv_cq = res->cq; 604 qp_init_attr.cap.max_send_wr = 1; 605 qp_init_attr.cap.max_recv_wr = 1; 606 qp_init_attr.cap.max_send_sge = 1; 607 qp_init_attr.cap.max_recv_sge = 1; 608 res->qp = ibv_create_qp(res->pd, &qp_init_attr); 609 if (!res->qp) 610 { 611 fprintf(stderr, "failed to create QP\n"); 612 rc = 1; 613 goto resources_create_exit; 614 } 615 fprintf(stdout, "QP was created, QP number=0x%x\n", res->qp->qp_num); 616resources_create_exit: 617 if (rc) 618 { 619 /* Error encountered, cleanup */ 620 if (res->qp) 621 { 622 ibv_destroy_qp(res->qp); 623 res->qp = NULL; 624 } 625 if (res->mr) 626 { 627 ibv_dereg_mr(res->mr); 628 res->mr = NULL; 629 } 630 if (res->buf) 631 { 632 free(res->buf); 633 res->buf = NULL; 634 } 635 if (res->cq) 636 { 637 ibv_destroy_cq(res->cq); 638 res->cq = NULL; 639 } 640 if (res->pd) 641 { 642 ibv_dealloc_pd(res->pd); 643 res->pd = NULL; 644 } 645 if (res->ib_ctx) 646 { 647 ibv_close_device(res->ib_ctx); 648 res->ib_ctx = NULL; 649 } 650 if (dev_list) 651 { 652 ibv_free_device_list(dev_list); 653 dev_list = NULL; 654 } 655 if (res->sock >= 0) 656 { 657 if (close(res->sock)) 658 fprintf(stderr, "failed to close socket\n"); 659 res->sock = -1; 660 } 661 } 662 return rc; 663} 664 665/****************************************************************************** 666 * Function: modify_qp_to_init 667 * 668 * Input 669 * qp QP to transition 670 * 671 * Output 672 * none 673 * 674 * Returns 675 * 0 on success, ibv_modify_qp failure code on failure 676 * 677 * Description 678 * Transition a QP from the RESET to INIT state 679 ******************************************************************************/ 680static int modify_qp_to_init(struct ibv_qp *qp) 681{ 682 struct ibv_qp_attr attr; 683 int flags; 684 int rc; 685 memset(&attr, 0, sizeof(attr)); 686 attr.qp_state = IBV_QPS_INIT; 687 attr.port_num = config.ib_port; 688 attr.pkey_index = 0; 689 attr.qp_access_flags = 690 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; 691 flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; 692 rc = ibv_modify_qp(qp, &attr, flags); 693 if (rc) 694 fprintf(stderr, "failed to modify QP state to INIT\n"); 695 return rc; 696} 697 698/****************************************************************************** 699 * Function: modify_qp_to_rtr 700 * 701 * Input 702 * qp QP to transition 703 * remote_qpn remote QP number 704 * dlid destination LID 705 * dgid destination GID (mandatory for RoCEE) 706 * 707 * Output 708 * none 709 * 710 * Returns 711 * 0 on success, ibv_modify_qp failure code on failure 712 * 713 * Description 714 * Transition a QP from the INIT to RTR state, using the specified QP number 715 ******************************************************************************/ 716static int modify_qp_to_rtr(struct ibv_qp *qp, uint32_t remote_qpn, 717 uint16_t dlid, uint8_t *dgid) 718{ 719 struct ibv_qp_attr attr; 720 int flags; 721 int rc; 722 memset(&attr, 0, sizeof(attr)); 723 attr.qp_state = IBV_QPS_RTR; 724 attr.path_mtu = IBV_MTU_256; 725 attr.dest_qp_num = remote_qpn; 726 attr.rq_psn = 0; 727 attr.max_dest_rd_atomic = 1; 728 attr.min_rnr_timer = 0x12; 729 attr.ah_attr.is_global = 0; 730 attr.ah_attr.dlid = dlid; 731 attr.ah_attr.sl = 0; 732 attr.ah_attr.src_path_bits = 0; 733 attr.ah_attr.port_num = config.ib_port; 734 if (config.gid_idx >= 0) 735 { 736 attr.ah_attr.is_global = 1; 737 attr.ah_attr.port_num = 1; 738 memcpy(&attr.ah_attr.grh.dgid, dgid, 16); 739 attr.ah_attr.grh.flow_label = 0; 740 attr.ah_attr.grh.hop_limit = 1; 741 attr.ah_attr.grh.sgid_index = config.gid_idx; 742 attr.ah_attr.grh.traffic_class = 0; 743 } 744 flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | 745 IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER; 746 rc = ibv_modify_qp(qp, &attr, flags); 747 if (rc) 748 fprintf(stderr, "failed to modify QP state to RTR\n"); 749 return rc; 750} 751 752/****************************************************************************** 753 * Function: modify_qp_to_rts 754 * 755 * Input 756 * qp QP to transition 757 * 758 * Output 759 * none 760 * 761 * Returns 762 * 0 on success, ibv_modify_qp failure code on failure 763 * 764 * Description 765 * Transition a QP from the RTR to RTS state 766 ******************************************************************************/ 767static int modify_qp_to_rts(struct ibv_qp *qp) 768{ 769 struct ibv_qp_attr attr; 770 int flags; 771 int rc; 772 memset(&attr, 0, sizeof(attr)); 773 attr.qp_state = IBV_QPS_RTS; 774 attr.timeout = 0x12; 775 attr.retry_cnt = 6; 776 attr.rnr_retry = 0; 777 attr.sq_psn = 0; 778 attr.max_rd_atomic = 1; 779 flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | 780 IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; 781 rc = ibv_modify_qp(qp, &attr, flags); 782 if (rc) 783 fprintf(stderr, "failed to modify QP state to RTS\n"); 784 return rc; 785} 786 787/****************************************************************************** 788 * Function: connect_qp 789 * 790 * Input 791 * res pointer to resources structure 792 * 793 * Output 794 * none 795 * 796 * Returns 797 * 0 on success, error code on failure 798 * 799 * Description 800 * Connect the QP. Transition the server side to RTR, sender side to RTS 801 ******************************************************************************/ 802static int connect_qp(struct resources *res) 803{ 804 struct cm_con_data_t local_con_data; 805 struct cm_con_data_t remote_con_data; 806 struct cm_con_data_t tmp_con_data; 807 int rc = 0; 808 char temp_char; 809 union ibv_gid my_gid; 810 if (config.gid_idx >= 0) 811 { 812 rc = ibv_query_gid(res->ib_ctx, config.ib_port, config.gid_idx, &my_gid); 813 if (rc) 814 { 815 fprintf(stderr, "could not get gid for port %d, index %d\n", 816 config.ib_port, config.gid_idx); 817 return rc; 818 } 819 } 820 else 821 memset(&my_gid, 0, sizeof my_gid); 822 /* exchange using TCP sockets info required to connect QPs */ 823 local_con_data.addr = htonll((uintptr_t)res->buf); 824 local_con_data.rkey = htonl(res->mr->rkey); 825 local_con_data.qp_num = htonl(res->qp->qp_num); 826 local_con_data.lid = htons(res->port_attr.lid); 827 memcpy(local_con_data.gid, &my_gid, 16); 828 fprintf(stdout, "\nLocal LID = 0x%x\n", res->port_attr.lid); 829 if (sock_sync_data(res->sock, sizeof(struct cm_con_data_t), 830 (char *)&local_con_data, (char *)&tmp_con_data) < 0) 831 { 832 fprintf(stderr, "failed to exchange connection data between sides\n"); 833 rc = 1; 834 goto connect_qp_exit; 835 } 836 remote_con_data.addr = ntohll(tmp_con_data.addr); 837 remote_con_data.rkey = ntohl(tmp_con_data.rkey); 838 remote_con_data.qp_num = ntohl(tmp_con_data.qp_num); 839 remote_con_data.lid = ntohs(tmp_con_data.lid); 840 memcpy(remote_con_data.gid, tmp_con_data.gid, 16); 841 /* save the remote side attributes, we will need it for the post SR */ 842 res->remote_props = remote_con_data; 843 fprintf(stdout, "Remote address = 0x%" PRIx64 "\n", remote_con_data.addr); 844 fprintf(stdout, "Remote rkey = 0x%x\n", remote_con_data.rkey); 845 fprintf(stdout, "Remote QP number = 0x%x\n", remote_con_data.qp_num); 846 fprintf(stdout, "Remote LID = 0x%x\n", remote_con_data.lid); 847 if (config.gid_idx >= 0) 848 { 849 uint8_t *p = remote_con_data.gid; 850 fprintf(stdout, "Remote GID = %02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x\n", 851 p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15]); 852 } 853 /* modify the QP to init */ 854 rc = modify_qp_to_init(res->qp); 855 if (rc) 856 { 857 fprintf(stderr, "change QP state to INIT failed\n"); 858 goto connect_qp_exit; 859 } 860 /* let the client post RR to be prepared for incoming messages */ 861 if (config.server_name) 862 { 863 rc = post_receive(res); 864 if (rc) 865 { 866 fprintf(stderr, "failed to post RR\n"); 867 goto connect_qp_exit; 868 } 869 } 870 /* modify the QP to RTR */ 871 rc = modify_qp_to_rtr(res->qp, remote_con_data.qp_num, remote_con_data.lid, 872 remote_con_data.gid); 873 if (rc) 874 { 875 fprintf(stderr, "failed to modify QP state to RTR\n"); 876 goto connect_qp_exit; 877 } 878 rc = modify_qp_to_rts(res->qp); 879 if (rc) 880 { 881 fprintf(stderr, "failed to modify QP state to RTR\n"); 882 goto connect_qp_exit; 883 } 884 fprintf(stdout, "QP state was change to RTS\n"); 885 /* sync to make sure that both sides are in states that they can connect 886 to prevent packet loose */ 887 if (sock_sync_data(res->sock, 1, "Q", &temp_char)) /* just send a dummy 888 char back and forth */ 889 { 890 fprintf(stderr, "sync error after QPs are were moved to RTS\n"); 891 rc = 1; 892 } 893connect_qp_exit: 894 return rc; 895} 896 897/****************************************************************************** 898 * Function: resources_destroy 899 * 900 * Input 901 * res pointer to resources structure 902 * 903 * Output 904 * none 905 * 906 * Returns 907 * 0 on success, 1 on failure 908 * 909 * Description 910 * Cleanup and deallocate all resources used 911 ******************************************************************************/ 912static int resources_destroy(struct resources *res) 913{ 914 int rc = 0; 915 if (res->qp) 916 if (ibv_destroy_qp(res->qp)) 917 { 918 fprintf(stderr, "failed to destroy QP\n"); 919 rc = 1; 920 } 921 if (res->mr) 922 if (ibv_dereg_mr(res->mr)) 923 { 924 fprintf(stderr, "failed to deregister MR\n"); 925 rc = 1; 926 } 927 if (res->buf) 928 free(res->buf); 929 if (res->cq) 930 if (ibv_destroy_cq(res->cq)) 931 { 932 fprintf(stderr, "failed to destroy CQ\n"); 933 rc = 1; 934 } 935 if (res->pd) 936 if (ibv_dealloc_pd(res->pd)) 937 { 938 fprintf(stderr, "failed to deallocate PD\n"); 939 rc = 1; 940 } 941 if (res->ib_ctx) 942 if (ibv_close_device(res->ib_ctx)) 943 { 944 fprintf(stderr, "failed to close device context\n"); 945 rc = 1; 946 } 947 if (res->sock >= 0) 948 if (close(res->sock)) 949 { 950 fprintf(stderr, "failed to close socket\n"); 951 rc = 1; 952 } 953 return rc; 954} 955 956/****************************************************************************** 957 * Function: print_config 958 * 959 * Input 960 * none 961 * 962 * Output 963 * none 964 * 965 * Returns 966 * none 967 * 968 * Description 969 * Print out config information 970 ******************************************************************************/ 971static void print_config(void) 972{ 973 fprintf(stdout, " ------------------------------------------------\n"); 974 fprintf(stdout, " Device name : \"%s\"\n", config.dev_name); 975 fprintf(stdout, " IB port : %u\n", config.ib_port); 976 if (config.server_name) 977 fprintf(stdout, " IP : %s\n", config.server_name); 978 fprintf(stdout, " TCP port : %u\n", config.tcp_port); 979 if (config.gid_idx >= 0) 980 fprintf(stdout, " GID index : %u\n", config.gid_idx); 981 fprintf(stdout, " ------------------------------------------------\n\n"); 982} 983 984/****************************************************************************** 985 * Function: usage 986 * 987 * Input 988 * argv0 command line arguments 989 * 990 * Output 991 * none 992 * 993 * Returns 994 * none 995 * 996 * Description 997 * print a description of command line syntax 998 ******************************************************************************/ 999static void usage(const char *argv0) 1000{ 1001 fprintf(stdout, "Usage:\n"); 1002 fprintf(stdout, " %s start a server and wait for connection\n", argv0); 1003 fprintf(stdout, " %s <host> connect to server at <host>\n", argv0); 1004 fprintf(stdout, "\n"); 1005 fprintf(stdout, "Options:\n"); 1006 fprintf(stdout, " -p, --port <port> listen on/connect to port <port> (default 18515)\n"); 1007 fprintf(stdout, " -d, --ib-dev <dev> use IB device <dev> (default first device found)\n"); 1008 fprintf(stdout, " -i, --ib-port <port> use port <port> of IB device (default 1)\n"); 1009 fprintf(stdout, " -g, --gid_idx <git index> gid index to be used in GRH (default not used)\n"); 1010 fprintf(stdout, " -h, --help\n"); 1011} 1012 1013/****************************************************************************** 1014 * Function: main 1015 * 1016 * Input 1017 * argc number of items in argv 1018 * argv command line parameters 1019 * 1020 * Output 1021 * none 1022 * 1023 * Returns 1024 * 0 on success, 1 on failure 1025 * 1026 * Description 1027 * Main program code 1028 ******************************************************************************/ 1029int main(int argc, char *argv[]) 1030{ 1031 struct resources res; 1032 int rc = 1; 1033 char temp_char; 1034 1035 /* parse the command line parameters */ 1036 while (1) 1037 { 1038 int c; 1039 static struct option long_options[] = { 1040 {.name = "port", .has_arg = 1, .val = 'p'}, 1041 {.name = "ib-dev", .has_arg = 1, .val = 'd'}, 1042 {.name = "ib-port", .has_arg = 1, .val = 'i'}, 1043 {.name = "gid-idx", .has_arg = 1, .val = 'g'}, 1044 {.name = "help", .has_arg = 0, .val = 'h'}, 1045 {.name = NULL, .has_arg = 0, .val = '\0'}}; 1046 // if a character is followed by a colon, the option is expected to have an argument, 1047 // which should be separated from it by white space. 1048 c = getopt_long(argc, argv, "p:d:i:g:h", long_options, NULL); 1049 if (c == -1) 1050 break; 1051 switch (c) 1052 { 1053 case 'p': 1054 config.tcp_port = strtoul(optarg, NULL, 0); 1055 break; 1056 case 'd': 1057 config.dev_name = strdup(optarg); 1058 break; 1059 case 'i': 1060 config.ib_port = strtoul(optarg, NULL, 0); 1061 if (config.ib_port < 0) 1062 { 1063 usage(argv[0]); 1064 return 1; 1065 } 1066 break; 1067 case 'g': 1068 config.gid_idx = strtoul(optarg, NULL, 0); 1069 if (config.gid_idx < 0) 1070 { 1071 usage(argv[0]); 1072 return 1; 1073 } 1074 break; 1075 case 'h': 1076 default: 1077 usage(argv[0]); 1078 return 1; 1079 } 1080 } 1081 /* parse the last parameter (if exists) as the server name */ 1082 // optind is modified to index the first nonoption. 1083 // https://stackoverflow.com/questions/46636641/how-does-optind-get-assigned-in-c 1084 if (optind == argc - 1) 1085 config.server_name = argv[optind]; 1086 else if (optind < argc) 1087 { 1088 usage(argv[0]); 1089 return 1; 1090 } 1091 /* print the used parameters for info*/ 1092 print_config(); 1093 /* init all of the resources, so cleanup will be easy */ 1094 resources_init(&res); 1095 /* create resources before using them */ 1096 if (resources_create(&res)) 1097 { 1098 fprintf(stderr, "failed to create resources\n"); 1099 goto main_exit; 1100 } 1101 /* connect the QPs */ 1102 if (connect_qp(&res)) 1103 { 1104 fprintf(stderr, "failed to connect QPs\n"); 1105 goto main_exit; 1106 } 1107 /* let the server post the sr */ 1108 if (!config.server_name) 1109 if (post_send(&res, IBV_WR_SEND)) 1110 { 1111 fprintf(stderr, "failed to post sr\n"); 1112 goto main_exit; 1113 } 1114 /* in both sides we expect to get a completion */ 1115 if (poll_completion(&res)) 1116 { 1117 fprintf(stderr, "poll completion failed\n"); 1118 goto main_exit; 1119 } 1120 /* after polling the completion we have the message in the client buffer 1121 too */ 1122 /* if client side */ 1123 if (config.server_name) 1124 { 1125 fprintf(stdout, "Message is: '%s'\n", res.buf); 1126 } 1127 else 1128 { 1129 /* setup server buffer with read message */ 1130 strcpy(res.buf, RDMAMSGR); 1131 } 1132 /* Sync so we are sure server side has data ready before client tries to 1133 read it */ 1134 if (sock_sync_data(res.sock, 1, "R", &temp_char)) /* just send a dummy 1135 char back and forth */ 1136 { 1137 fprintf(stderr, "sync error before RDMA ops\n"); 1138 rc = 1; 1139 goto main_exit; 1140 } 1141 /* Now the client performs an RDMA read and then write on server. 1142 Note that the server has no idea these events have occured */ 1143 if (config.server_name) 1144 { 1145 /* First we read contens of server's buffer */ 1146 if (post_send(&res, IBV_WR_RDMA_READ)) 1147 { 1148 fprintf(stderr, "failed to post SR 2\n"); 1149 rc = 1; 1150 goto main_exit; 1151 } 1152 if (poll_completion(&res)) 1153 { 1154 fprintf(stderr, "poll completion failed 2\n"); 1155 rc = 1; 1156 goto main_exit; 1157 } 1158 fprintf(stdout, "Contents of server's buffer: '%s'\n", res.buf); 1159 /* Now we replace what's in the server's buffer */ 1160 strcpy(res.buf, RDMAMSGW); 1161 fprintf(stdout, "Now replacing it with: '%s'\n", res.buf); 1162 if (post_send(&res, IBV_WR_RDMA_WRITE)) 1163 { 1164 fprintf(stderr, "failed to post SR 3\n"); 1165 rc = 1; 1166 goto main_exit; 1167 } 1168 if (poll_completion(&res)) 1169 { 1170 fprintf(stderr, "poll completion failed 3\n"); 1171 rc = 1; 1172 goto main_exit; 1173 } 1174 } 1175 /* Sync so server will know that client is done mucking with its memory */ 1176 if (sock_sync_data(res.sock, 1, "W", &temp_char)) /* just send a dummy 1177 char back and forth */ 1178 { 1179 fprintf(stderr, "sync error after RDMA ops\n"); 1180 rc = 1; 1181 goto main_exit; 1182 } 1183 if (!config.server_name) 1184 fprintf(stdout, "Contents of server buffer: '%s'\n", res.buf); 1185 rc = 0; 1186main_exit: 1187 if (resources_destroy(&res)) 1188 { 1189 fprintf(stderr, "failed to destroy resources\n"); 1190 rc = 1; 1191 } 1192 if (config.dev_name) 1193 free((char *)config.dev_name); 1194 fprintf(stdout, "\ntest result is %d\n", rc); 1195 return rc; 1196}
运行演示
启动服务端
./RDMA_RC_example
启动客户端
./RDMA_RC_example localhost运行结果
服务端输出示例
server output (show/hide)
客户端输出示例
client output (show/hide)