8.2 发送、接收、RDMA 读取、RDMA 写入的代码

示例代码

RDMA_RC_example.c (show/hide)

   1/*
   2* BUILD COMMAND:
   3* gcc -Wall -I/usr/local/ofed/include -O2 -o RDMA_RC_example -L/usr/local/
   4ofed/lib64 -L/usr/local/ofed/lib -libverbs RDMA_RC_example.c
   5*
   6* Usage:
   7* RDMA_RC_example --help
   8*/
   9/******************************************************************************
  10*
  11* RDMA Aware Networks Programming Example
  12*
  13* This code demonstrates how to perform the following operations using the
  14* VPI Verbs API:
  15*
  16* Send
  17* Receive
  18* RDMA Read
  19* RDMA Write
  20*
  21*****************************************************************************/
  22#include <arpa/inet.h>
  23#include <byteswap.h>
  24#include <endian.h>
  25#include <getopt.h>
  26#include <infiniband/verbs.h>
  27#include <inttypes.h>
  28#include <netdb.h>
  29#include <stdint.h>
  30#include <stdio.h>
  31#include <stdlib.h>
  32#include <string.h>
  33#include <sys/socket.h>
  34#include <sys/time.h>
  35#include <sys/types.h>
  36#include <unistd.h>
  37/* poll CQ timeout in millisec (2 seconds) */
  38#define MAX_POLL_CQ_TIMEOUT 2000
  39#define MSG      "SEND operation      "
  40#define RDMAMSGR "RDMA read operation "
  41#define RDMAMSGW "RDMA write operation"
  42#define MSG_SIZE (strlen(MSG) + 1)
  43#if __BYTE_ORDER == __LITTLE_ENDIAN
  44static inline uint64_t htonll(uint64_t x) { return bswap_64(x); }
  45static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); }
  46#elif __BYTE_ORDER == __BIG_ENDIAN
  47static inline uint64_t htonll(uint64_t x) { return x; }
  48static inline uint64_t ntohll(uint64_t x) { return x; }
  49#else
  50#error __BYTE_ORDER is neither __LITTLE_ENDIAN nor __BIG_ENDIAN
  51#endif
  52
  53/* structure of test parameters */
  54struct config_t
  55{
  56    const char *dev_name; /* IB device name */
  57    char *server_name;    /* server host name */
  58    u_int32_t tcp_port;   /* server TCP port */
  59    int ib_port;          /* local IB port to work with */
  60    int gid_idx;          /* gid index to use */
  61};
  62
  63/* structure to exchange data which is needed to connect the QPs */
  64struct cm_con_data_t
  65{
  66    uint64_t addr;   /* Buffer address */
  67    uint32_t rkey;   /* Remote key */
  68    uint32_t qp_num; /* QP number */
  69    uint16_t lid;    /* LID of the IB port */
  70    uint8_t gid[16]; /* gid */
  71} __attribute__((packed));
  72
  73/* structure of system resources */
  74struct resources
  75{
  76    struct ibv_device_attr device_attr;
  77    /* Device attributes */
  78    struct ibv_port_attr port_attr;    /* IB port attributes */
  79    struct cm_con_data_t remote_props; /* values to connect to remote side */
  80    struct ibv_context *ib_ctx;        /* device handle */
  81    struct ibv_pd *pd;                 /* PD handle */
  82    struct ibv_cq *cq;                 /* CQ handle */
  83    struct ibv_qp *qp;                 /* QP handle */
  84    struct ibv_mr *mr;                 /* MR handle for buf */
  85    char *buf;                         /* memory buffer pointer, used for RDMA and send ops */
  86    int sock;                          /* TCP socket file descriptor */
  87};
  88
  89struct config_t config = {
  90    NULL,  /* dev_name */
  91    NULL,  /* server_name */
  92    19875, /* tcp_port */
  93    1,     /* ib_port */
  94    -1     /* gid_idx */
  95};
  96
  97/******************************************************************************
  98Socket operations
  99For simplicity, the example program uses TCP sockets to exchange control
 100information. If a TCP/IP stack/connection is not available, connection manager
 101(CM) may be used to pass this information. Use of CM is beyond the scope of
 102this example
 103******************************************************************************/
 104
 105/******************************************************************************
 106 * Function: sock_connect
 107 *
 108 * Input
 109 * servername URL of server to connect to (NULL for server mode)
 110 * port port of service
 111 *
 112 * Output
 113 * none
 114 *
 115 * Returns
 116 * socket (fd) on success, negative error code on failure
 117 *
 118 * Description
 119 * Connect a socket. If servername is specified a client connection will be
 120 * initiated to the indicated server and port. Otherwise listen on the
 121 * indicated port for an incoming connection.
 122 *
 123 ******************************************************************************/
 124
 125static int sock_connect(const char *servername, int port)
 126{
 127    // `struct addrinfo' is not part of standard C, but belongs to gnu.
 128    // https://stackoverflow.com/questions/33076175/why-is-struct-addrinfo-defined-only-if-use-xopen2k-is-defined
 129    struct addrinfo *resolved_addr = NULL;
 130    struct addrinfo *iterator;
 131    char service[6];
 132    int sockfd = -1;
 133    int listenfd = 0;
 134    int tmp;
 135
 136    struct addrinfo hints = {
 137        .ai_flags = AI_PASSIVE, .ai_family = AF_INET, .ai_socktype = SOCK_STREAM};
 138    if (sprintf(service, "%d", port) < 0)
 139        goto sock_connect_exit;
 140    /* Resolve DNS address, use sockfd as temp storage */
 141    sockfd = getaddrinfo(servername, service, &hints, &resolved_addr);
 142    if (sockfd < 0)
 143    {
 144        fprintf(stderr, "%s for %s:%d\n", gai_strerror(sockfd), servername, port);
 145        goto sock_connect_exit;
 146    }
 147    /* Search through results and find the one we want */
 148    for (iterator = resolved_addr; iterator; iterator = iterator->ai_next)
 149    {
 150        sockfd = socket(iterator->ai_family, iterator->ai_socktype,
 151                        iterator->ai_protocol);
 152        if (sockfd >= 0)
 153        {
 154            if (servername)
 155            {
 156                /* Client mode. Initiate connection to remote */
 157                if ((tmp = connect(sockfd, iterator->ai_addr, iterator->ai_addrlen)))
 158                {
 159                    fprintf(stdout, "failed connect \n");
 160                    close(sockfd);
 161                    sockfd = -1;
 162                }
 163            }
 164            else
 165            {
 166                /* Server mode. Set up listening socket an accept a connection
 167                 */
 168                listenfd = sockfd;
 169                sockfd = -1;
 170                if (bind(listenfd, iterator->ai_addr, iterator->ai_addrlen))
 171                    goto sock_connect_exit;
 172                listen(listenfd, 1);
 173                sockfd = accept(listenfd, NULL, 0);
 174            }
 175        }
 176    }
 177sock_connect_exit:
 178    if (listenfd)
 179        close(listenfd);
 180    if (resolved_addr)
 181        freeaddrinfo(resolved_addr);
 182    if (sockfd < 0)
 183    {
 184        if (servername)
 185            fprintf(stderr, "Couldn't connect to %s:%d\n", servername, port);
 186        else
 187        {
 188            perror("server accept");
 189            fprintf(stderr, "accept() failed\n");
 190        }
 191    }
 192    return sockfd;
 193}
 194
 195/******************************************************************************
 196* Function: sock_sync_data
 197*
 198* Input
 199* sock socket to transfer data on
 200* xfer_size size of data to transfer
 201* local_data pointer to data to be sent to remote
 202*
 203* Output
 204* remote_data pointer to buffer to receive remote data
 205*
 206* Returns
 207* 0 on success, negative error code on failure
 208*
 209* Description
 210* Sync data across a socket. The indicated local data will be sent to the
 211* remote. It will then wait for the remote to send its data back. It is
 212* assumed that the two sides are in sync and call this function in the proper
 213* order. Chaos will ensue if they are not. :)
 214*
 215* Also note this is a blocking function and will wait for the full data to be
 216* received from the remote.
 217*
 218******************************************************************************/
 219int sock_sync_data(int sock, int xfer_size, char *local_data,
 220                   char *remote_data)
 221{
 222    int rc;
 223    int read_bytes = 0;
 224    int total_read_bytes = 0;
 225    rc = write(sock, local_data, xfer_size);
 226    if (rc < xfer_size)
 227        fprintf(stderr, "Failed writing data during sock_sync_data\n");
 228    else
 229        rc = 0;
 230    while (!rc && total_read_bytes < xfer_size)
 231    {
 232        read_bytes = read(sock, remote_data, xfer_size);
 233        if (read_bytes > 0)
 234            total_read_bytes += read_bytes;
 235        else
 236            rc = read_bytes;
 237    }
 238    return rc;
 239}
 240
 241/******************************************************************************
 242End of socket operations
 243******************************************************************************/
 244
 245/* poll_completion */
 246/******************************************************************************
 247 * Function: poll_completion
 248 *
 249 * Input
 250 * res pointer to resources structure
 251 *
 252 * Output
 253 * none
 254 *
 255 * Returns
 256 * 0 on success, 1 on failure
 257 *
 258 * Description
 259 * Poll the completion queue for a single event. This function will continue to
 260 * poll the queue until MAX_POLL_CQ_TIMEOUT milliseconds have passed.
 261 *
 262 ******************************************************************************/
 263static int poll_completion(struct resources *res)
 264{
 265    struct ibv_wc wc;
 266    unsigned long start_time_msec;
 267    unsigned long cur_time_msec;
 268    struct timeval cur_time;
 269    int poll_result;
 270    int rc = 0;
 271    /* poll the completion for a while before giving up of doing it .. */
 272    gettimeofday(&cur_time, NULL);
 273    start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
 274    do
 275    {
 276        poll_result = ibv_poll_cq(res->cq, 1, &wc);
 277        gettimeofday(&cur_time, NULL);
 278        cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
 279    } while ((poll_result == 0) &&
 280             ((cur_time_msec - start_time_msec) < MAX_POLL_CQ_TIMEOUT));
 281    if (poll_result < 0)
 282    {
 283        /* poll CQ failed */
 284        fprintf(stderr, "poll CQ failed\n");
 285        rc = 1;
 286    }
 287    else if (poll_result == 0)
 288    {
 289        /* the CQ is empty */
 290        fprintf(stderr, "completion wasn't found in the CQ after timeout\n");
 291        rc = 1;
 292    }
 293    else
 294    {
 295        /* CQE found */
 296        fprintf(stdout, "completion was found in CQ with status 0x%x\n", wc.status);
 297        /* check the completion status (here we don't care about the completion
 298        opcode */
 299        if (wc.status != IBV_WC_SUCCESS)
 300        {
 301            fprintf(stderr, "got bad completion with status: 0x%x, vendor syndrome: 0x%x\n", wc.status, wc.vendor_err);
 302            rc = 1;
 303        }
 304    }
 305    return rc;
 306}
 307
 308/******************************************************************************
 309 * Function: post_send
 310 *
 311 * Input
 312 * res pointer to resources structure
 313 * opcode IBV_WR_SEND, IBV_WR_RDMA_READ or IBV_WR_RDMA_WRITE
 314 *
 315 * Output
 316 * none
 317 *
 318 * Returns
 319 * 0 on success, error code on failure
 320 *
 321 * Description
 322 * This function will create and post a send work request
 323 ******************************************************************************/
 324static int post_send(struct resources *res, int opcode)
 325{
 326    struct ibv_send_wr sr;
 327    struct ibv_sge sge;
 328    struct ibv_send_wr *bad_wr = NULL;
 329    int rc;
 330
 331    /* prepare the scatter/gather entry */
 332    memset(&sge, 0, sizeof(sge));
 333    sge.addr = (uintptr_t)res->buf;
 334    sge.length = MSG_SIZE;
 335    sge.lkey = res->mr->lkey;
 336    /* prepare the send work request */
 337    memset(&sr, 0, sizeof(sr));
 338    sr.next = NULL;
 339    sr.wr_id = 0;
 340    sr.sg_list = &sge;
 341    sr.num_sge = 1;
 342    sr.opcode = opcode;
 343    sr.send_flags = IBV_SEND_SIGNALED;
 344    if (opcode != IBV_WR_SEND)
 345    {
 346        sr.wr.rdma.remote_addr = res->remote_props.addr;
 347        sr.wr.rdma.rkey = res->remote_props.rkey;
 348    }
 349    /* there is a Receive Request in the responder side, so we won't get any
 350    into RNR flow */
 351    rc = ibv_post_send(res->qp, &sr, &bad_wr);
 352    if (rc)
 353        fprintf(stderr, "failed to post SR\n");
 354    else
 355    {
 356        switch (opcode)
 357        {
 358        case IBV_WR_SEND:
 359            fprintf(stdout, "Send Request was posted\n");
 360            break;
 361        case IBV_WR_RDMA_READ:
 362            fprintf(stdout, "RDMA Read Request was posted\n");
 363            break;
 364        case IBV_WR_RDMA_WRITE:
 365            fprintf(stdout, "RDMA Write Request was posted\n");
 366            break;
 367        default:
 368            fprintf(stdout, "Unknown Request was posted\n");
 369            break;
 370        }
 371    }
 372    return rc;
 373}
 374
 375/******************************************************************************
 376 * Function: post_receive
 377 *
 378 * Input
 379 * res pointer to resources structure
 380 *
 381 * Output
 382 * none
 383 *
 384 * Returns
 385 * 0 on success, error code on failure
 386 *
 387 * Description
 388 *
 389 ******************************************************************************/
 390static int post_receive(struct resources *res)
 391{
 392    struct ibv_recv_wr rr;
 393    struct ibv_sge sge;
 394    struct ibv_recv_wr *bad_wr;
 395    int rc;
 396    /* prepare the scatter/gather entry */
 397    memset(&sge, 0, sizeof(sge));
 398    sge.addr = (uintptr_t)res->buf;
 399    sge.length = MSG_SIZE;
 400    sge.lkey = res->mr->lkey;
 401    /* prepare the receive work request */
 402    memset(&rr, 0, sizeof(rr));
 403    rr.next = NULL;
 404    rr.wr_id = 0;
 405    rr.sg_list = &sge;
 406    rr.num_sge = 1;
 407    /* post the Receive Request to the RQ */
 408    rc = ibv_post_recv(res->qp, &rr, &bad_wr);
 409    if (rc)
 410        fprintf(stderr, "failed to post RR\n");
 411    else
 412        fprintf(stdout, "Receive Request was posted\n");
 413    return rc;
 414}
 415
 416/******************************************************************************
 417 * Function: resources_init
 418 *
 419 * Input
 420 * res pointer to resources structure
 421 *
 422 * Output
 423 * res is initialized
 424 *
 425 * Returns
 426 * none
 427 *
 428 * Description
 429 * res is initialized to default values
 430 ******************************************************************************/
 431static void resources_init(struct resources *res)
 432{
 433    memset(res, 0, sizeof *res);
 434    res->sock = -1;
 435}
 436
 437/******************************************************************************
 438 * Function: resources_create
 439 *
 440 * Input
 441 * res pointer to resources structure to be filled in
 442 *
 443 * Output
 444 * res filled in with resources
 445 *
 446 * Returns
 447 * 0 on success, 1 on failure
 448 *
 449 * Description
 450 *
 451 * This function creates and allocates all necessary system resources. These
 452 * are stored in res.
 453 *****************************************************************************/
 454static int resources_create(struct resources *res)
 455{
 456    struct ibv_device **dev_list = NULL;
 457    struct ibv_qp_init_attr qp_init_attr;
 458    struct ibv_device *ib_dev = NULL;
 459    size_t size;
 460    int i;
 461    int mr_flags = 0;
 462    int cq_size = 0;
 463    int num_devices;
 464    int rc = 0;
 465
 466    /* if client side */
 467    if (config.server_name)
 468    {
 469        res->sock = sock_connect(config.server_name, config.tcp_port);
 470        if (res->sock < 0)
 471        {
 472            fprintf(stderr,
 473                    "failed to establish TCP connection to server %s, port %d\n ",
 474                    config.server_name,
 475                    config.tcp_port);
 476            rc = -1;
 477            goto resources_create_exit;
 478        }
 479    }
 480    else
 481    {
 482        fprintf(stdout, "waiting on port %d for TCP connection\n", config.tcp_port);
 483        res->sock = sock_connect(NULL, config.tcp_port);
 484        if (res->sock < 0)
 485        {
 486            fprintf(stderr, "failed to establish TCP connection with client on port %d\n ", config.tcp_port);
 487            rc = -1;
 488            goto resources_create_exit;
 489        }
 490    }
 491    fprintf(stdout, "TCP connection was established\n");
 492    fprintf(stdout, "searching for IB devices in host\n");
 493    /* get device names in the system */
 494    dev_list = ibv_get_device_list(&num_devices);
 495    if (!dev_list)
 496    {
 497        fprintf(stderr, "failed to get IB devices list\n");
 498        rc = 1;
 499        goto resources_create_exit;
 500    }
 501    /* if there isn't any IB device in host */
 502    if (!num_devices)
 503    {
 504        fprintf(stderr, "found %d device(s)\n", num_devices);
 505        rc = 1;
 506        goto resources_create_exit;
 507    }
 508    fprintf(stdout, "found %d device(s)\n", num_devices);
 509    /* search for the specific device we want to work with */
 510    for (i = 0; i < num_devices; i++)
 511    {
 512        if (!config.dev_name)
 513        {
 514            config.dev_name = strdup(ibv_get_device_name(dev_list[i]));
 515            fprintf(stdout, "device not specified, using first one found: %s\n", config.dev_name);
 516        }
 517        if (!strcmp(ibv_get_device_name(dev_list[i]), config.dev_name))
 518        {
 519            ib_dev = dev_list[i];
 520            break;
 521        }
 522    }
 523    /* if the device wasn't found in host */
 524    if (!ib_dev)
 525    {
 526        fprintf(stderr, "IB device %s wasn't found\n", config.dev_name);
 527        rc = 1;
 528        goto resources_create_exit;
 529    }
 530    /* get device handle */
 531    res->ib_ctx = ibv_open_device(ib_dev);
 532    if (!res->ib_ctx)
 533    {
 534        fprintf(stderr, "failed to open device %s\n", config.dev_name);
 535        rc = 1;
 536        goto resources_create_exit;
 537    }
 538    /* We are now done with device list, free it */
 539    ibv_free_device_list(dev_list);
 540    dev_list = NULL;
 541    ib_dev = NULL;
 542    /* query port properties */
 543    if (ibv_query_port(res->ib_ctx, config.ib_port, &res->port_attr))
 544    {
 545        fprintf(stderr, "ibv_query_port on port %u failed\n", config.ib_port);
 546        rc = 1;
 547        goto resources_create_exit;
 548    }
 549    /* allocate Protection Domain */
 550    res->pd = ibv_alloc_pd(res->ib_ctx);
 551    if (!res->pd)
 552    {
 553        fprintf(stderr, "ibv_alloc_pd failed\n");
 554        rc = 1;
 555        goto resources_create_exit;
 556    }
 557    /* each side will send only one WR, so Completion Queue with 1 entry is
 558    enough */
 559    cq_size = 1;
 560    res->cq = ibv_create_cq(res->ib_ctx, cq_size, NULL, NULL, 0);
 561    if (!res->cq)
 562    {
 563        fprintf(stderr, "failed to create CQ with %u entries\n", cq_size);
 564        rc = 1;
 565        goto resources_create_exit;
 566    }
 567    /* allocate the memory buffer that will hold the data */
 568    size = MSG_SIZE;
 569    res->buf = (char *)malloc(size);
 570    if (!res->buf)
 571    {
 572        fprintf(stderr, "failed to malloc %zu bytes to memory buffer\n", size);
 573        rc = 1;
 574        goto resources_create_exit;
 575    }
 576    memset(res->buf, 0, size);
 577    /* only in the server side put the message in the memory buffer */
 578    if (!config.server_name)
 579    {
 580        strcpy(res->buf, MSG);
 581        fprintf(stdout, "going to send the message: '%s'\n", res->buf);
 582    }
 583    else
 584        memset(res->buf, 0, size);
 585    /* register the memory buffer */
 586    mr_flags =
 587        IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
 588    res->mr = ibv_reg_mr(res->pd, res->buf, size, mr_flags);
 589    if (!res->mr)
 590    {
 591        fprintf(stderr, "ibv_reg_mr failed with mr_flags=0x%x\n", mr_flags);
 592        rc = 1;
 593        goto resources_create_exit;
 594    }
 595    fprintf(stdout,
 596            "MR was registered with addr=%p, lkey=0x%x, rkey=0x%x, flags=0x%x\n",
 597            res->buf, res->mr->lkey, res->mr->rkey, mr_flags);
 598    /* create the Queue Pair */
 599    memset(&qp_init_attr, 0, sizeof(qp_init_attr));
 600    qp_init_attr.qp_type = IBV_QPT_RC;
 601    qp_init_attr.sq_sig_all = 1;
 602    qp_init_attr.send_cq = res->cq;
 603    qp_init_attr.recv_cq = res->cq;
 604    qp_init_attr.cap.max_send_wr = 1;
 605    qp_init_attr.cap.max_recv_wr = 1;
 606    qp_init_attr.cap.max_send_sge = 1;
 607    qp_init_attr.cap.max_recv_sge = 1;
 608    res->qp = ibv_create_qp(res->pd, &qp_init_attr);
 609    if (!res->qp)
 610    {
 611        fprintf(stderr, "failed to create QP\n");
 612        rc = 1;
 613        goto resources_create_exit;
 614    }
 615    fprintf(stdout, "QP was created, QP number=0x%x\n", res->qp->qp_num);
 616resources_create_exit:
 617    if (rc)
 618    {
 619        /* Error encountered, cleanup */
 620        if (res->qp)
 621        {
 622            ibv_destroy_qp(res->qp);
 623            res->qp = NULL;
 624        }
 625        if (res->mr)
 626        {
 627            ibv_dereg_mr(res->mr);
 628            res->mr = NULL;
 629        }
 630        if (res->buf)
 631        {
 632            free(res->buf);
 633            res->buf = NULL;
 634        }
 635        if (res->cq)
 636        {
 637            ibv_destroy_cq(res->cq);
 638            res->cq = NULL;
 639        }
 640        if (res->pd)
 641        {
 642            ibv_dealloc_pd(res->pd);
 643            res->pd = NULL;
 644        }
 645        if (res->ib_ctx)
 646        {
 647            ibv_close_device(res->ib_ctx);
 648            res->ib_ctx = NULL;
 649        }
 650        if (dev_list)
 651        {
 652            ibv_free_device_list(dev_list);
 653            dev_list = NULL;
 654        }
 655        if (res->sock >= 0)
 656        {
 657            if (close(res->sock))
 658                fprintf(stderr, "failed to close socket\n");
 659            res->sock = -1;
 660        }
 661    }
 662    return rc;
 663}
 664
 665/******************************************************************************
 666 * Function: modify_qp_to_init
 667 *
 668 * Input
 669 * qp QP to transition
 670 *
 671 * Output
 672 * none
 673 *
 674 * Returns
 675 * 0 on success, ibv_modify_qp failure code on failure
 676 *
 677 * Description
 678 * Transition a QP from the RESET to INIT state
 679 ******************************************************************************/
 680static int modify_qp_to_init(struct ibv_qp *qp)
 681{
 682    struct ibv_qp_attr attr;
 683    int flags;
 684    int rc;
 685    memset(&attr, 0, sizeof(attr));
 686    attr.qp_state = IBV_QPS_INIT;
 687    attr.port_num = config.ib_port;
 688    attr.pkey_index = 0;
 689    attr.qp_access_flags =
 690        IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
 691    flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
 692    rc = ibv_modify_qp(qp, &attr, flags);
 693    if (rc)
 694        fprintf(stderr, "failed to modify QP state to INIT\n");
 695    return rc;
 696}
 697
 698/******************************************************************************
 699 * Function: modify_qp_to_rtr
 700 *
 701 * Input
 702 * qp QP to transition
 703 * remote_qpn remote QP number
 704 * dlid destination LID
 705 * dgid destination GID (mandatory for RoCEE)
 706 *
 707 * Output
 708 * none
 709 *
 710 * Returns
 711 * 0 on success, ibv_modify_qp failure code on failure
 712 *
 713 * Description
 714 * Transition a QP from the INIT to RTR state, using the specified QP number
 715 ******************************************************************************/
 716static int modify_qp_to_rtr(struct ibv_qp *qp, uint32_t remote_qpn,
 717                            uint16_t dlid, uint8_t *dgid)
 718{
 719    struct ibv_qp_attr attr;
 720    int flags;
 721    int rc;
 722    memset(&attr, 0, sizeof(attr));
 723    attr.qp_state = IBV_QPS_RTR;
 724    attr.path_mtu = IBV_MTU_256;
 725    attr.dest_qp_num = remote_qpn;
 726    attr.rq_psn = 0;
 727    attr.max_dest_rd_atomic = 1;
 728    attr.min_rnr_timer = 0x12;
 729    attr.ah_attr.is_global = 0;
 730    attr.ah_attr.dlid = dlid;
 731    attr.ah_attr.sl = 0;
 732    attr.ah_attr.src_path_bits = 0;
 733    attr.ah_attr.port_num = config.ib_port;
 734    if (config.gid_idx >= 0)
 735    {
 736        attr.ah_attr.is_global = 1;
 737        attr.ah_attr.port_num = 1;
 738        memcpy(&attr.ah_attr.grh.dgid, dgid, 16);
 739        attr.ah_attr.grh.flow_label = 0;
 740        attr.ah_attr.grh.hop_limit = 1;
 741        attr.ah_attr.grh.sgid_index = config.gid_idx;
 742        attr.ah_attr.grh.traffic_class = 0;
 743    }
 744    flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
 745            IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
 746    rc = ibv_modify_qp(qp, &attr, flags);
 747    if (rc)
 748        fprintf(stderr, "failed to modify QP state to RTR\n");
 749    return rc;
 750}
 751
 752/******************************************************************************
 753 * Function: modify_qp_to_rts
 754 *
 755 * Input
 756 * qp QP to transition
 757 *
 758 * Output
 759 * none
 760 *
 761 * Returns
 762 * 0 on success, ibv_modify_qp failure code on failure
 763 *
 764 * Description
 765 * Transition a QP from the RTR to RTS state
 766 ******************************************************************************/
 767static int modify_qp_to_rts(struct ibv_qp *qp)
 768{
 769    struct ibv_qp_attr attr;
 770    int flags;
 771    int rc;
 772    memset(&attr, 0, sizeof(attr));
 773    attr.qp_state = IBV_QPS_RTS;
 774    attr.timeout = 0x12;
 775    attr.retry_cnt = 6;
 776    attr.rnr_retry = 0;
 777    attr.sq_psn = 0;
 778    attr.max_rd_atomic = 1;
 779    flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
 780            IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
 781    rc = ibv_modify_qp(qp, &attr, flags);
 782    if (rc)
 783        fprintf(stderr, "failed to modify QP state to RTS\n");
 784    return rc;
 785}
 786
 787/******************************************************************************
 788 * Function: connect_qp
 789 *
 790 * Input
 791 * res pointer to resources structure
 792 *
 793 * Output
 794 * none
 795 *
 796 * Returns
 797 * 0 on success, error code on failure
 798 *
 799 * Description
 800 * Connect the QP. Transition the server side to RTR, sender side to RTS
 801 ******************************************************************************/
 802static int connect_qp(struct resources *res)
 803{
 804    struct cm_con_data_t local_con_data;
 805    struct cm_con_data_t remote_con_data;
 806    struct cm_con_data_t tmp_con_data;
 807    int rc = 0;
 808    char temp_char;
 809    union ibv_gid my_gid;
 810    if (config.gid_idx >= 0)
 811    {
 812        rc = ibv_query_gid(res->ib_ctx, config.ib_port, config.gid_idx, &my_gid);
 813        if (rc)
 814        {
 815            fprintf(stderr, "could not get gid for port %d, index %d\n",
 816                    config.ib_port, config.gid_idx);
 817            return rc;
 818        }
 819    }
 820    else
 821        memset(&my_gid, 0, sizeof my_gid);
 822    /* exchange using TCP sockets info required to connect QPs */
 823    local_con_data.addr = htonll((uintptr_t)res->buf);
 824    local_con_data.rkey = htonl(res->mr->rkey);
 825    local_con_data.qp_num = htonl(res->qp->qp_num);
 826    local_con_data.lid = htons(res->port_attr.lid);
 827    memcpy(local_con_data.gid, &my_gid, 16);
 828    fprintf(stdout, "\nLocal LID = 0x%x\n", res->port_attr.lid);
 829    if (sock_sync_data(res->sock, sizeof(struct cm_con_data_t),
 830                       (char *)&local_con_data, (char *)&tmp_con_data) < 0)
 831    {
 832        fprintf(stderr, "failed to exchange connection data between sides\n");
 833        rc = 1;
 834        goto connect_qp_exit;
 835    }
 836    remote_con_data.addr = ntohll(tmp_con_data.addr);
 837    remote_con_data.rkey = ntohl(tmp_con_data.rkey);
 838    remote_con_data.qp_num = ntohl(tmp_con_data.qp_num);
 839    remote_con_data.lid = ntohs(tmp_con_data.lid);
 840    memcpy(remote_con_data.gid, tmp_con_data.gid, 16);
 841    /* save the remote side attributes, we will need it for the post SR */
 842    res->remote_props = remote_con_data;
 843    fprintf(stdout, "Remote address = 0x%" PRIx64 "\n", remote_con_data.addr);
 844    fprintf(stdout, "Remote rkey = 0x%x\n", remote_con_data.rkey);
 845    fprintf(stdout, "Remote QP number = 0x%x\n", remote_con_data.qp_num);
 846    fprintf(stdout, "Remote LID = 0x%x\n", remote_con_data.lid);
 847    if (config.gid_idx >= 0)
 848    {
 849        uint8_t *p = remote_con_data.gid;
 850        fprintf(stdout, "Remote GID = %02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x\n",
 851                p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15]);
 852    }
 853    /* modify the QP to init */
 854    rc = modify_qp_to_init(res->qp);
 855    if (rc)
 856    {
 857        fprintf(stderr, "change QP state to INIT failed\n");
 858        goto connect_qp_exit;
 859    }
 860    /* let the client post RR to be prepared for incoming messages */
 861    if (config.server_name)
 862    {
 863        rc = post_receive(res);
 864        if (rc)
 865        {
 866            fprintf(stderr, "failed to post RR\n");
 867            goto connect_qp_exit;
 868        }
 869    }
 870    /* modify the QP to RTR */
 871    rc = modify_qp_to_rtr(res->qp, remote_con_data.qp_num, remote_con_data.lid,
 872                          remote_con_data.gid);
 873    if (rc)
 874    {
 875        fprintf(stderr, "failed to modify QP state to RTR\n");
 876        goto connect_qp_exit;
 877    }
 878    rc = modify_qp_to_rts(res->qp);
 879    if (rc)
 880    {
 881        fprintf(stderr, "failed to modify QP state to RTR\n");
 882        goto connect_qp_exit;
 883    }
 884    fprintf(stdout, "QP state was change to RTS\n");
 885    /* sync to make sure that both sides are in states that they can connect
 886    to prevent packet loose */
 887    if (sock_sync_data(res->sock, 1, "Q", &temp_char)) /* just send a dummy
 888    char back and forth */
 889    {
 890        fprintf(stderr, "sync error after QPs are were moved to RTS\n");
 891        rc = 1;
 892    }
 893connect_qp_exit:
 894    return rc;
 895}
 896
 897/******************************************************************************
 898 * Function: resources_destroy
 899 *
 900 * Input
 901 * res pointer to resources structure
 902 *
 903 * Output
 904 * none
 905 *
 906 * Returns
 907 * 0 on success, 1 on failure
 908 *
 909 * Description
 910 * Cleanup and deallocate all resources used
 911 ******************************************************************************/
 912static int resources_destroy(struct resources *res)
 913{
 914    int rc = 0;
 915    if (res->qp)
 916        if (ibv_destroy_qp(res->qp))
 917        {
 918            fprintf(stderr, "failed to destroy QP\n");
 919            rc = 1;
 920        }
 921    if (res->mr)
 922        if (ibv_dereg_mr(res->mr))
 923        {
 924            fprintf(stderr, "failed to deregister MR\n");
 925            rc = 1;
 926        }
 927    if (res->buf)
 928        free(res->buf);
 929    if (res->cq)
 930        if (ibv_destroy_cq(res->cq))
 931        {
 932            fprintf(stderr, "failed to destroy CQ\n");
 933            rc = 1;
 934        }
 935    if (res->pd)
 936        if (ibv_dealloc_pd(res->pd))
 937        {
 938            fprintf(stderr, "failed to deallocate PD\n");
 939            rc = 1;
 940        }
 941    if (res->ib_ctx)
 942        if (ibv_close_device(res->ib_ctx))
 943        {
 944            fprintf(stderr, "failed to close device context\n");
 945            rc = 1;
 946        }
 947    if (res->sock >= 0)
 948        if (close(res->sock))
 949        {
 950            fprintf(stderr, "failed to close socket\n");
 951            rc = 1;
 952        }
 953    return rc;
 954}
 955
 956/******************************************************************************
 957 * Function: print_config
 958 *
 959 * Input
 960 * none
 961 *
 962 * Output
 963 * none
 964 *
 965 * Returns
 966 * none
 967 *
 968 * Description
 969 * Print out config information
 970 ******************************************************************************/
 971static void print_config(void)
 972{
 973    fprintf(stdout, " ------------------------------------------------\n");
 974    fprintf(stdout, " Device name : \"%s\"\n", config.dev_name);
 975    fprintf(stdout, " IB port : %u\n", config.ib_port);
 976    if (config.server_name)
 977        fprintf(stdout, " IP : %s\n", config.server_name);
 978    fprintf(stdout, " TCP port : %u\n", config.tcp_port);
 979    if (config.gid_idx >= 0)
 980        fprintf(stdout, " GID index : %u\n", config.gid_idx);
 981    fprintf(stdout, " ------------------------------------------------\n\n");
 982}
 983
 984/******************************************************************************
 985 * Function: usage
 986 *
 987 * Input
 988 * argv0 command line arguments
 989 *
 990 * Output
 991 * none
 992 *
 993 * Returns
 994 * none
 995 *
 996 * Description
 997 * print a description of command line syntax
 998 ******************************************************************************/
 999static void usage(const char *argv0)
1000{
1001    fprintf(stdout, "Usage:\n");
1002    fprintf(stdout, " %s start a server and wait for connection\n", argv0);
1003    fprintf(stdout, " %s <host> connect to server at <host>\n", argv0);
1004    fprintf(stdout, "\n");
1005    fprintf(stdout, "Options:\n");
1006    fprintf(stdout, " -p, --port <port> listen on/connect to port <port> (default 18515)\n");
1007    fprintf(stdout, " -d, --ib-dev <dev> use IB device <dev> (default first device found)\n");
1008    fprintf(stdout, " -i, --ib-port <port> use port <port> of IB device (default 1)\n");
1009    fprintf(stdout, " -g, --gid_idx <git index> gid index to be used in GRH (default not used)\n");
1010    fprintf(stdout, " -h, --help\n");
1011}
1012
1013/******************************************************************************
1014 * Function: main
1015 *
1016 * Input
1017 * argc number of items in argv
1018 * argv command line parameters
1019 *
1020 * Output
1021 * none
1022 *
1023 * Returns
1024 * 0 on success, 1 on failure
1025 *
1026 * Description
1027 * Main program code
1028 ******************************************************************************/
1029int main(int argc, char *argv[])
1030{
1031    struct resources res;
1032    int rc = 1;
1033    char temp_char;
1034
1035    /* parse the command line parameters */
1036    while (1)
1037    {
1038        int c;
1039        static struct option long_options[] = {
1040            {.name = "port", .has_arg = 1, .val = 'p'},
1041            {.name = "ib-dev", .has_arg = 1, .val = 'd'},
1042            {.name = "ib-port", .has_arg = 1, .val = 'i'},
1043            {.name = "gid-idx", .has_arg = 1, .val = 'g'},
1044            {.name = "help", .has_arg = 0, .val = 'h'},
1045            {.name = NULL, .has_arg = 0, .val = '\0'}};
1046        // if a character is followed by a colon, the option is expected to have an argument,
1047        // which should be separated from it by white space.
1048        c = getopt_long(argc, argv, "p:d:i:g:h", long_options, NULL);
1049        if (c == -1)
1050            break;
1051        switch (c)
1052        {
1053        case 'p':
1054            config.tcp_port = strtoul(optarg, NULL, 0);
1055            break;
1056        case 'd':
1057            config.dev_name = strdup(optarg);
1058            break;
1059        case 'i':
1060            config.ib_port = strtoul(optarg, NULL, 0);
1061            if (config.ib_port < 0)
1062            {
1063                usage(argv[0]);
1064                return 1;
1065            }
1066            break;
1067        case 'g':
1068            config.gid_idx = strtoul(optarg, NULL, 0);
1069            if (config.gid_idx < 0)
1070            {
1071                usage(argv[0]);
1072                return 1;
1073            }
1074            break;
1075        case 'h':
1076        default:
1077            usage(argv[0]);
1078            return 1;
1079        }
1080    }
1081    /* parse the last parameter (if exists) as the server name */
1082    // optind is modified to index the first nonoption.
1083    // https://stackoverflow.com/questions/46636641/how-does-optind-get-assigned-in-c
1084    if (optind == argc - 1)
1085        config.server_name = argv[optind];
1086    else if (optind < argc)
1087    {
1088        usage(argv[0]);
1089        return 1;
1090    }
1091    /* print the used parameters for info*/
1092    print_config();
1093    /* init all of the resources, so cleanup will be easy */
1094    resources_init(&res);
1095    /* create resources before using them */
1096    if (resources_create(&res))
1097    {
1098        fprintf(stderr, "failed to create resources\n");
1099        goto main_exit;
1100    }
1101    /* connect the QPs */
1102    if (connect_qp(&res))
1103    {
1104        fprintf(stderr, "failed to connect QPs\n");
1105        goto main_exit;
1106    }
1107    /* let the server post the sr */
1108    if (!config.server_name)
1109        if (post_send(&res, IBV_WR_SEND))
1110        {
1111            fprintf(stderr, "failed to post sr\n");
1112            goto main_exit;
1113        }
1114    /* in both sides we expect to get a completion */
1115    if (poll_completion(&res))
1116    {
1117        fprintf(stderr, "poll completion failed\n");
1118        goto main_exit;
1119    }
1120    /* after polling the completion we have the message in the client buffer
1121    too */
1122    /* if client side */
1123    if (config.server_name)
1124    {
1125        fprintf(stdout, "Message is: '%s'\n", res.buf);
1126    }
1127    else
1128    {
1129        /* setup server buffer with read message */
1130        strcpy(res.buf, RDMAMSGR);
1131    }
1132    /* Sync so we are sure server side has data ready before client tries to
1133    read it */
1134    if (sock_sync_data(res.sock, 1, "R", &temp_char)) /* just send a dummy
1135    char back and forth */
1136    {
1137        fprintf(stderr, "sync error before RDMA ops\n");
1138        rc = 1;
1139        goto main_exit;
1140    }
1141    /* Now the client performs an RDMA read and then write on server.
1142    Note that the server has no idea these events have occured */
1143    if (config.server_name)
1144    {
1145        /* First we read contens of server's buffer */
1146        if (post_send(&res, IBV_WR_RDMA_READ))
1147        {
1148            fprintf(stderr, "failed to post SR 2\n");
1149            rc = 1;
1150            goto main_exit;
1151        }
1152        if (poll_completion(&res))
1153        {
1154            fprintf(stderr, "poll completion failed 2\n");
1155            rc = 1;
1156            goto main_exit;
1157        }
1158        fprintf(stdout, "Contents of server's buffer: '%s'\n", res.buf);
1159        /* Now we replace what's in the server's buffer */
1160        strcpy(res.buf, RDMAMSGW);
1161        fprintf(stdout, "Now replacing it with: '%s'\n", res.buf);
1162        if (post_send(&res, IBV_WR_RDMA_WRITE))
1163        {
1164            fprintf(stderr, "failed to post SR 3\n");
1165            rc = 1;
1166            goto main_exit;
1167        }
1168        if (poll_completion(&res))
1169        {
1170            fprintf(stderr, "poll completion failed 3\n");
1171            rc = 1;
1172            goto main_exit;
1173        }
1174    }
1175    /* Sync so server will know that client is done mucking with its memory */
1176    if (sock_sync_data(res.sock, 1, "W", &temp_char)) /* just send a dummy
1177    char back and forth */
1178    {
1179        fprintf(stderr, "sync error after RDMA ops\n");
1180        rc = 1;
1181        goto main_exit;
1182    }
1183    if (!config.server_name)
1184        fprintf(stdout, "Contents of server buffer: '%s'\n", res.buf);
1185    rc = 0;
1186main_exit:
1187    if (resources_destroy(&res))
1188    {
1189        fprintf(stderr, "failed to destroy resources\n");
1190        rc = 1;
1191    }
1192    if (config.dev_name)
1193        free((char *)config.dev_name);
1194    fprintf(stdout, "\ntest result is %d\n", rc);
1195    return rc;
1196}

运行演示

  1. 启动服务端

    ./RDMA_RC_example
    
  2. 启动客户端

    ./RDMA_RC_example localhost
    
  3. 运行结果

    服务端输出示例

    server output (show/hide)

    客户端输出示例

    client output (show/hide)