Skip to content

Commit

Permalink
prov/shm: Refactor ze ipc path to use pidfd
Browse files Browse the repository at this point in the history
Move away from using a socket to initialize level-zero ipc by path
and start using pidfd instead.

This new protocol will open the pidfd
(process id's fd containing all open fd's for that process) for a
peer and save it in their region. This is done to avoid multiple
open/close calls on the same fd. We then use this pidfd to lookup
each open fd from level-zero's open_handle and cache them for later
re-use. These are stored as a new data type "ze_pid_handle" which
contains a pid, the fd from handle_get, the fd from handle_open, and
the pid_fd. When a cache entry is evicted then the handle's open fds
will be closed.

These changes re-enable the level-zero ipc protocol.

Signed-off-by: Zach Dworkin <[email protected]>
  • Loading branch information
zachdworkin committed Aug 2, 2024
1 parent 99fa453 commit cfcee28
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 452 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ common_srcs = \
prov/util/src/ze_mem_monitor.c \
prov/util/src/cuda_ipc_monitor.c \
prov/util/src/rocr_ipc_monitor.c \
prov/util/src/ze_ipc_monitor.c \
prov/util/src/xpmem_monitor.c \
prov/util/src/util_profile.c \
prov/coll/src/coll_attr.c \
Expand Down
18 changes: 18 additions & 0 deletions include/linux/osd.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ size_t ofi_ifaddr_get_speed(struct ifaddrs *ifa);
# define __NR_process_vm_writev 311
#endif

#ifndef __NR_pidfd_open
# define __NR_pidfd_open 434
#endif

#ifndef __NR_pidfd_getfd
# define __NR_pidfd_getfd 438
#endif

static inline ssize_t ofi_process_vm_readv(pid_t pid,
const struct iovec *local_iov,
unsigned long liovcnt,
Expand All @@ -122,6 +130,16 @@ static inline size_t ofi_process_vm_writev(pid_t pid,
remote_iov, riovcnt, flags);
}

static inline int ofi_pidfd_open(pid_t pid, unsigned int flags)
{
return syscall(__NR_pidfd_open, pid, flags);
}

static inline int ofi_pidfd_getfd(int pidfd, int targetfd, unsigned int flags)
{
return syscall(__NR_pidfd_getfd, pidfd, targetfd, flags);
}

static inline ssize_t ofi_read_socket(SOCKET fd, void *buf, size_t count)
{
return read(fd, buf, count);
Expand Down
19 changes: 14 additions & 5 deletions include/ofi_hmem.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,24 +204,33 @@ int cuda_gdrcopy_dev_register(const void *buf, size_t len, uint64_t *handle);
int cuda_gdrcopy_dev_unregister(uint64_t handle);
int cuda_set_sync_memops(void *ptr);

/*
* ze handle is just an fd so we need a wrapper to contain extra fields for ipc
* copies
* get_fd fd from handle_get
* open_fd fd from handle_open needed for closing extra fd
* pid_fd fd that references a pid used to open an fd across processes
*/
struct ze_pid_handle {
int get_fd;
int open_fd;
int pid_fd;
};

int ze_hmem_copy(uint64_t device, void *dst, const void *src, size_t size);
int ze_hmem_init(void);
int ze_hmem_cleanup(void);
bool ze_hmem_is_addr_valid(const void *addr, uint64_t *device, uint64_t *flags);
int ze_hmem_get_handle(void *dev_buf, size_t size, void **handle);
void ze_set_pid_fd(void **handle, int pid_fd);
int ze_hmem_open_handle(void **handle, size_t size, uint64_t device,
void **ipc_ptr);
int ze_hmem_get_shared_handle(uint64_t device, void *dev_buf, int *ze_fd,
void **handle);
int ze_hmem_open_shared_handle(uint64_t device, int *peer_fds, void **handle,
int *ze_fd, void **ipc_ptr);
int ze_hmem_close_handle(void *ipc_ptr, void **handle);
bool ze_hmem_p2p_enabled(void);
int ze_hmem_get_ipc_handle_size(size_t *size);
int ze_hmem_get_base_addr(const void *ptr, size_t len, void **base,
size_t *size);
int ze_hmem_get_id(const void *ptr, uint64_t *id);
int *ze_hmem_get_dev_fds(int *nfds);
int ze_hmem_host_register(void *ptr, size_t size);
int ze_hmem_host_unregister(void *ptr);
int ze_dev_register(const void *addr, size_t size, uint64_t *handle);
Expand Down
1 change: 1 addition & 0 deletions include/ofi_mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ extern struct ofi_mem_monitor *cuda_ipc_monitor;
extern struct ofi_mem_monitor *rocr_monitor;
extern struct ofi_mem_monitor *rocr_ipc_monitor;
extern struct ofi_mem_monitor *ze_monitor;
extern struct ofi_mem_monitor *ze_ipc_monitor;
extern struct ofi_mem_monitor *import_monitor;
extern struct ofi_mem_monitor *xpmem_monitor;

Expand Down
1 change: 1 addition & 0 deletions libfabric.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@
<ClCompile Include="prov\util\src\rocr_mem_monitor.c" />
<ClCompile Include="prov\util\src\ze_mem_monitor.c" />
<ClCompile Include="prov\util\src\cuda_ipc_monitor.c" />
<ClCompile Include="prov\util\src\ze_ipc_monitor.c" />
<ClCompile Include="prov\util\src\rocr_ipc_monitor.c" />
<ClCompile Include="prov\util\src\xpmem_monitor.c" />
<ClCompile Include="prov\coll\src\coll_attr.c" />
Expand Down
17 changes: 15 additions & 2 deletions prov/shm/src/smr.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ struct smr_tx_entry {
void *map_ptr;
struct smr_ep_name *map_name;
struct ofi_mr *mr[SMR_IOV_LIMIT];
int fd;
};

struct smr_pend_entry {
Expand Down Expand Up @@ -282,7 +281,8 @@ size_t smr_copy_from_sar(struct smr_freestack *sar_pool, struct smr_resp *resp,
const struct iovec *iov, size_t count,
size_t *bytes_done);
int smr_select_proto(void **desc, size_t iov_count, bool cma_avail,
uint32_t op, uint64_t total_len, uint64_t op_flags);
bool ipc_valid, uint32_t op, uint64_t total_len,
uint64_t op_flags);
typedef ssize_t (*smr_proto_func)(struct smr_ep *ep, struct smr_region *peer_smr,
int64_t id, int64_t peer_id, uint32_t op, uint64_t tag,
uint64_t data, uint64_t op_flags, struct ofi_mr **desc,
Expand Down Expand Up @@ -320,6 +320,19 @@ static inline bool smr_vma_enabled(struct smr_ep *ep,
peer_smr->xpmem_cap_self == SMR_VMA_CAP_ON);
}

static inline void smr_set_ipc_invalid(struct smr_region *region, uint64_t id)
{
if (region->map->peers[id].pid_fd == -1)
smr_peer_data(region)[id].ipc_invalid = 1;
}

static inline bool smr_ipc_valid(struct smr_ep *ep, struct smr_region *peer_smr,
int64_t id, int64_t peer_id)
{
return (!smr_peer_data(ep->region)[id].ipc_invalid &&
!smr_peer_data(peer_smr)[peer_id].ipc_invalid);
}

static inline bool smr_ze_ipc_enabled(struct smr_region *smr,
struct smr_region *peer_smr)
{
Expand Down
Loading

0 comments on commit cfcee28

Please sign in to comment.