Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/peer: Introducing peer "provider" with peer_cq to begin with #4

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ common_srcs = \
prov/util/src/rocr_mem_monitor.c \
prov/util/src/ze_mem_monitor.c \
prov/util/src/cuda_ipc_monitor.c \
prov/peer/src/peer_cq.c \
prov/peer/src/peer.h \
prov/coll/src/coll_attr.c \
prov/coll/src/coll_av.c \
prov/coll/src/coll_av_set.c \
prov/coll/src/coll_coll.c \
prov/coll/src/coll_cq.c \
prov/coll/src/coll_domain.c \
prov/coll/src/coll_ep.c \
prov/coll/src/coll_eq.c \
Expand Down Expand Up @@ -189,6 +190,7 @@ src_libfabric_la_SOURCES = \
include/ofi_net.h \
include/ofi_perf.h \
include/ofi_coll.h \
include/ofi_peer.h \
include/fasthash.h \
include/rbtree.h \
include/uthash.h \
Expand Down
1 change: 1 addition & 0 deletions fabtests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ nobase_dist_config_DATA = \
pytest/efa/test_rnr.py \
pytest/efa/test_efa_info.py \
pytest/efa/test_efa_protocol_selection.py \
pytest/efa/test_efa_device_selection.py \
pytest/efa/test_runt.py \
pytest/efa/test_fork_support.py \
pytest/efa/test_mr.py \
Expand Down
47 changes: 45 additions & 2 deletions fabtests/pytest/efa/efa_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@ def efa_run_client_server_test(cmdline_args, executable, iteration_type,
test.run()

@retry(retry_on_exception=is_ssh_connection_error, stop_max_attempt_number=3, wait_fixed=5000)
def efa_retrieve_hw_counter_value(hostname, hw_counter_name):
def efa_retrieve_hw_counter_value(hostname, hw_counter_name, efa_device_name=None):
"""
retrieve the value of EFA's hardware counter
hostname: a host that has efa
hw_counter_name: EFA hardware counter name. Options are: lifespan, rdma_read_resp_bytes, rdma_read_wrs,recv_wrs,
rx_drops, send_bytes, tx_bytes, rdma_read_bytes, rdma_read_wr_err, recv_bytes, rx_bytes, rx_pkts, send_wrs, tx_pkts
efa_device_name: Name of the EFA device. Corresponds to the name of the EFA device's directory
return: an integer that is sum of all EFA device's counter
"""
command = 'ssh {} cat "/sys/class/infiniband/*/ports/*/hw_counters/{}"'.format(hostname, hw_counter_name)

if efa_device_name:
efa_device_dir = efa_device_name
else:
efa_device_dir = '*'

command = 'ssh {} cat "/sys/class/infiniband/{}/ports/*/hw_counters/{}"'.format(hostname, efa_device_dir, hw_counter_name)
process = subprocess.run(command, shell=True, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
if process.returncode != 0:
if process.stderr and has_ssh_connection_err_msg(process.stderr):
Expand Down Expand Up @@ -70,3 +77,39 @@ def efa_retrieve_gid(hostname):
return None

return process.stdout.decode("utf-8").strip()

@retry(retry_on_exception=is_ssh_connection_error, stop_max_attempt_number=3, wait_fixed=5000)
def get_efa_domain_names(server_id):
timeout = 60
process_timed_out = False

# This command returns a list of EFA domain names and its related info
command = "ssh {} fi_info -p efa".format(server_id)
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")

try:
p.wait(timeout=timeout)
except subprocess.TimeoutExpired:
p.terminate()
process_timed_out = True

assert not process_timed_out, "Process timed out"

errors = p.stderr.readlines()
for error in errors:
error = error.strip()
if "fi_getinfo: -61" in error:
raise Exception("No EFA devices/domain names found")

if has_ssh_connection_err_msg(error):
raise SshConnectionError()

efa_domain_names = []
for line in p.stdout:
line = line.strip()
if 'domain' in line:
domain_name = line.split(': ')[1]
efa_domain_names.append(domain_name)

return efa_domain_names

39 changes: 39 additions & 0 deletions fabtests/pytest/efa/test_efa_device_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest


# This test must be run in serial mode because it checks the hw counter
@pytest.mark.serial
@pytest.mark.functional
def test_efa_device_selection(cmdline_args):
from efa.efa_common import efa_retrieve_hw_counter_value, get_efa_domain_names
from common import ClientServerTest

if cmdline_args.server_id == cmdline_args.client_id:
pytest.skip("EFA device selection test requires 2 nodes")
return

efa_domain_names = get_efa_domain_names(cmdline_args.server_id)

for efa_domain_name in efa_domain_names:
if '-rdm' in efa_domain_name:
assert not('-dgrm') in efa_domain_name
fabtest_opts = "fi_rdm_pingpong"
elif '-dgrm' in efa_domain_name:
assert not('-rdm') in efa_domain_name
fabtest_opts = "fi_dgram_pingpong -k"

efa_device_name = efa_domain_name.split('-')[0]

server_tx_bytes_before_test = efa_retrieve_hw_counter_value(cmdline_args.server_id, "tx_bytes", efa_device_name)
client_tx_bytes_before_test = efa_retrieve_hw_counter_value(cmdline_args.client_id, "tx_bytes", efa_device_name)

executable = "{} -d {}".format(fabtest_opts, efa_domain_name)
test = ClientServerTest(cmdline_args, executable, message_size="1000", timeout=300)
test.run()

server_tx_bytes_after_test = efa_retrieve_hw_counter_value(cmdline_args.server_id, "tx_bytes", efa_device_name)
client_tx_bytes_after_test = efa_retrieve_hw_counter_value(cmdline_args.client_id, "tx_bytes", efa_device_name)

# Verify EFA traffic
assert server_tx_bytes_before_test < server_tx_bytes_after_test
assert client_tx_bytes_before_test < client_tx_bytes_after_test
38 changes: 38 additions & 0 deletions include/ofi_peer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2022 Intel Corporation, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

#ifndef _OFI_PEER_H_
#define _OFI_PEER_H_

#include <ofi_util.h>

#endif /* _OFI_PEER_H_ */
13 changes: 8 additions & 5 deletions prov/coll/src/coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
#include <ofi_hmem.h>
#include <ofi_prov.h>
#include <ofi_atomic.h>
#include <ofi_peer.h>
#include <ofi_coll.h>

#define COLL_IOV_LIMIT 4
#define COLL_MR_MODES (OFI_MR_BASIC_MAP | FI_MR_LOCAL)
Expand All @@ -81,11 +83,6 @@ struct coll_av {
struct fid_peer_av *peer_av;
};

struct coll_cq {
struct util_cq util_cq;
struct fid_peer_cq *peer_cq;
};

struct coll_eq {
struct util_eq util_eq;
struct fid_eq *peer_eq;
Expand All @@ -101,6 +98,12 @@ struct coll_ep {
*/
struct fi_info *peer_info;
struct fid_ep *peer_ep;

/*
* local pointers to peer_cq - configured via bind() operation
*/
struct fid_peer_cq *tx_peer_cq;
struct fid_peer_cq *rx_peer_cq;
};

struct coll_mr {
Expand Down
10 changes: 4 additions & 6 deletions prov/coll/src/coll_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
*/

#include "coll.h"
#include "ofi_coll.h"

static uint64_t coll_form_tag(uint32_t coll_id, uint32_t rank)
{
Expand Down Expand Up @@ -721,13 +720,12 @@ void coll_join_comp(struct util_coll_operation *coll_op)
void coll_collective_comp(struct util_coll_operation *coll_op)
{
struct coll_ep *ep;
struct coll_cq *cq;

int ret;
ep = container_of(coll_op->ep, struct coll_ep, util_ep.ep_fid);
cq = container_of(ep->util_ep.tx_cq, struct coll_cq, util_cq);

if (cq->peer_cq->owner_ops->write(cq->peer_cq, coll_op->context,
FI_COLLECTIVE, 0, 0, 0, 0, 0))
ret = ep->tx_peer_cq->owner_ops->write(ep->tx_peer_cq, coll_op->context,
FI_COLLECTIVE, 0, 0, 0, 0, 0);
if (ret)
FI_WARN(ep->util_ep.domain->fabric->prov, FI_LOG_DOMAIN,
"collective - cq write failed\n");

Expand Down
105 changes: 0 additions & 105 deletions prov/coll/src/coll_cq.c

This file was deleted.

2 changes: 1 addition & 1 deletion prov/coll/src/coll_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
static struct fi_ops_domain coll_domain_ops = {
.size = sizeof(struct fi_ops_domain),
.av_open = coll_av_open,
.cq_open = coll_cq_open,
.cq_open = fi_no_cq_open,
.endpoint = coll_endpoint,
.scalable_ep = fi_no_scalable_ep,
.cntr_open = fi_no_cntr_open,
Expand Down
26 changes: 25 additions & 1 deletion prov/coll/src/coll_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,30 @@ static struct fi_ops_collective coll_ops_collective = {
.msg = fi_coll_no_msg,
};

static int
coll_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags)
{
struct coll_ep *ep;
struct fid_peer_cq *peer_cq;

if (bfid->fclass != FI_CLASS_PEER_CQ) {
return ofi_ep_fid_bind(ep_fid, bfid, flags);
}
if (!(flags & (FI_TRANSMIT | FI_RECV))) {
return -FI_EINVAL;
}

ep = container_of(ep_fid, struct coll_ep, util_ep.ep_fid.fid);
peer_cq = container_of(bfid, struct fid_peer_cq, fid);

if (flags & FI_TRANSMIT)
ep->tx_peer_cq = peer_cq;
if (flags & FI_RECV)
ep->rx_peer_cq = peer_cq;
return FI_SUCCESS;
}


static int coll_ep_close(struct fid *fid)
{
struct coll_ep *ep;
Expand All @@ -103,7 +127,7 @@ static int coll_ep_ctrl(struct fid *fid, int command, void *arg)
static struct fi_ops coll_ep_fi_ops = {
.size = sizeof(struct fi_ops),
.close = coll_ep_close,
.bind = ofi_ep_fid_bind,
.bind = coll_ep_bind,
.control = coll_ep_ctrl,
.ops_open = fi_no_ops_open,
};
Expand Down
Loading