Skip to content

Commit

Permalink
Untested: delimited events
Browse files Browse the repository at this point in the history
  • Loading branch information
lukstafi committed Sep 29, 2024
1 parent 833b7e0 commit 9690071
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Added

- CUDA events.
- Delimited events: they are owned by a stream they record, and are automatically destroyed after synchronization.

### Changed

Expand Down
70 changes: 61 additions & 9 deletions cudajit.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1040,9 +1040,27 @@ end

type bigstring = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
type lifetime = Remember : 'a -> lifetime
type stream = { mutable args_lifetimes : lifetime list; stream : cu_stream }
type delimited_event = { event : cu_event; mutable is_destroyed : bool }

let no_stream = { args_lifetimes = []; stream = Ctypes.(coerce (ptr void) cu_stream null) }
let destroy_event event = check "cu_event_destroy" @@ Cuda.cu_event_destroy event

type stream = {
mutable args_lifetimes : lifetime list;
mutable owned_events : delimited_event list;
stream : cu_stream;
}

let release_stream stream =
stream.args_lifetimes <- [];
List.iter
(fun event ->
if not event.is_destroyed then destroy_event event.event;
event.is_destroyed <- false)
stream.owned_events;
stream.owned_events <- []

let no_stream =
{ args_lifetimes = []; owned_events = []; stream = Ctypes.(coerce (ptr void) cu_stream null) }

module Context = struct
type t = cu_context
Expand Down Expand Up @@ -1151,7 +1169,7 @@ module Context = struct

let synchronize () =
check "cu_ctx_synchronize" @@ Cuda.cu_ctx_synchronize ();
no_stream.args_lifetimes <- []
release_stream no_stream

let disable_peer_access ctx =
check "cu_ctx_disable_peer_access" @@ Cuda.cu_ctx_disable_peer_access ctx
Expand Down Expand Up @@ -1521,7 +1539,8 @@ module Stream = struct
| Double of float
[@@deriving sexp_of]

let no_stream = { args_lifetimes = []; stream = Ctypes.(coerce (ptr void) cu_stream null) }
let no_stream =
{ args_lifetimes = []; owned_events = []; stream = Ctypes.(coerce (ptr void) cu_stream null) }

let launch_kernel func ~grid_dim_x ?(grid_dim_y = 1) ?(grid_dim_z = 1) ~block_dim_x
?(block_dim_y = 1) ?(block_dim_z = 1) ~shared_mem_bytes stream kernel_params =
Expand Down Expand Up @@ -1566,8 +1585,8 @@ module Stream = struct
| true -> Unsigned.UInt.of_int64 cu_stream_non_blocking

let destroy stream =
check "cu_stream_destroy" @@ Cuda.cu_stream_destroy stream.stream;
stream.args_lifetimes <- []
release_stream stream;
check "cu_stream_destroy" @@ Cuda.cu_stream_destroy stream.stream

let create ?(non_blocking = false) ?(lower_priority = 0) () =
let open Ctypes in
Expand All @@ -1576,7 +1595,7 @@ module Stream = struct
@@ Cuda.cu_stream_create_with_priority stream
(uint_of_cu_stream_flags ~non_blocking)
lower_priority;
let stream = { args_lifetimes = []; stream = !@stream } in
let stream = { args_lifetimes = []; owned_events = []; stream = !@stream } in
Stdlib.Gc.finalise destroy stream;
stream

Expand All @@ -1597,12 +1616,13 @@ module Stream = struct
| CUDA_ERROR_NOT_READY -> false
| e ->
check "cu_stream_query" e;
(* We do not destroy delimited events, but any kernel arguments no longer needed. *)
stream.args_lifetimes <- [];
true

let synchronize stream =
check "cu_stream_synchronize" @@ Cuda.cu_stream_synchronize stream.stream;
stream.args_lifetimes <- []
release_stream stream

let memcpy_D_to_H_unsafe ~(dst : unit Ctypes.ptr) ~src:(Deviceptr src) ~size_in_bytes stream =
check "cu_memcpy_D_to_H_async"
Expand Down Expand Up @@ -1653,7 +1673,7 @@ module Event = struct
default
[ blocking_sync; disable_timing; interprocess ]

let destroy event = check "cu_event_destroy" @@ Cuda.cu_event_destroy event
let destroy event = destroy_event event

let create ?(blocking_sync = false) ?(enable_timing = false) ?(interprocess = false) () =
let open Ctypes in
Expand Down Expand Up @@ -1696,3 +1716,35 @@ module Event = struct
in
check "cu_stream_wait_event" @@ Cuda.cu_stream_wait_event stream.stream event flags
end

module Delimited_event = struct
type t = delimited_event

let elapsed_time ~start ~end_ =
if start.is_destroyed || end_.is_destroyed then
invalid_arg "Delimited_event.elapsed_time: one of the events is already destroyed";
Event.elapsed_time ~start:start.event ~end_:end_.event

let query event =
if event.is_destroyed then invalid_arg "Delimited_event.query: the event is already destroyed";
Event.query event.event

let record ?blocking_sync ?enable_timing ?interprocess ?external_ stream =
let event = Event.create ?blocking_sync ?enable_timing ?interprocess () in
Event.record ?external_ event stream;
let result = { event; is_destroyed = false } in
stream.owned_events <- result :: stream.owned_events;
result

let synchronize event =
if not event.is_destroyed then (
Event.synchronize event.event;
destroy_event event.event;
event.is_destroyed <- true)

let wait ?external_ stream event =
if event.is_destroyed then invalid_arg "Delimited_event.wait: the event is already destroyed";
Event.wait ?external_ stream event.event

let is_destroyed event = event.is_destroyed
end
40 changes: 40 additions & 0 deletions cudajit.mli
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,43 @@ module Event : sig
{{:https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__STREAM.html#group__CUDA__STREAM_1g6a898b652dfc6aa1d5c8d97062618b2f}
cuStreamWaitEvent}. *)
end

(** This module builds on top of functionality more directly exposed by {!Event}. It optimizes
resource management for use-cases where events are not reused: there's only one call to
[record], and it's immediately after [create]. *)
module Delimited_event : sig
type t
(** An delimited event encapsulates {!Event.t} and is owned by a stream. It records its owner at
creation, and gets destroyed when either it or its owner are synchronized (or if neither
happens, when it is garbage-collected). *)

val record :
?blocking_sync:bool ->
?enable_timing:bool ->
?interprocess:bool ->
?external_:bool ->
Stream.t ->
t
(** Combines {!Event.create} and {!Event.record} to create an event owned by the given stream. *)

val is_destroyed : t -> bool
(** Returns true if the delimited event is already released via a call to
{{:https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__EVENT.html#group__CUDA__EVENT_1g593ec73a8ec5a5fc031311d3e4dca1ef}
cuEventDestroy}. The event can be released via either {!synchronize}, or
{!Stream.synchronize}. *)

val elapsed_time : start:t -> end_:t -> float
(** See {!Event.elapsed_time}. [elapsed_time ~start ~end_] raises [Invalid_argument] if either
[start] or [end_] is already destroyed. *)

val query : t -> bool
(** See {!Event.query}. [query event] raises [Invalid_argument] if [event] is already
destroyed. *)

val synchronize : t -> unit
(** See {!Event.synchronize}. [synchronize event] is a no-op if [is_destroyed event] is true. *)

val wait : ?external_:bool -> Stream.t -> t -> unit
(** See {!Event.wait}. [wait stream event] raises [Invalid_argument] if [event] is already
destroyed. *)
end

0 comments on commit 9690071

Please sign in to comment.