Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix] revert o_direct flag #53

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 70 additions & 44 deletions csrc/pthread_backend.cpp
Original file line number Diff line number Diff line change
@@ -1,124 +1,150 @@
#include "pthread_backend.h"

void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) {
void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback)
{
auto fut = this->pool.submit_task(
[fd, buffer, n_bytes, offset] {
[fd, buffer, n_bytes, offset]
{
return pwrite(fd, buffer, n_bytes, offset);
}
);
});
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
}

void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) {
void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback)
{
auto fut = this->pool.submit_task(
[fd, iov, iovcnt, offset] {
[fd, iov, iovcnt, offset]
{
return pwritev(fd, iov, iovcnt, offset);
}
);
});
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
}

void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) {
void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback)
{
auto fut = this->pool.submit_task(
[fd, buffer, n_bytes, offset] {
[fd, buffer, n_bytes, offset]
{
return pread(fd, buffer, n_bytes, offset);
}
);
});
this->read_fut.push_back(std::make_tuple(std::move(fut), callback));
}

void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) {
auto fut = this->pool.submit_task(
[fd, iov, iovcnt, offset] {
void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback)
{
auto fut = this->pool.submit_task(
[fd, iov, iovcnt, offset]
{
return preadv(fd, iov, iovcnt, offset);
}
);
});
this->read_fut.push_back(std::make_tuple(std::move(fut), callback));
}

void PthreadAsyncIO::get_event(WaitType wt) {
if (wt == NOWAIT) return;
void PthreadAsyncIO::get_event(WaitType wt)
{
if (wt == NOWAIT)
return;
this->sync_write_events();
this->sync_read_events();
}

void PthreadAsyncIO::sync_write_events() {
while (this->write_fut.size() > 0) {
void PthreadAsyncIO::sync_write_events()
{
while (this->write_fut.size() > 0)
{
auto front = std::move(this->write_fut.front());
this->write_fut.pop_front();

auto fut(std::move(std::get<0>(front)));
fut.wait();

auto callback = std::get<1>(front);
if (callback != nullptr) {
if (callback != nullptr)
{
callback();
}
}
}

void PthreadAsyncIO::sync_read_events() {
while (this->read_fut.size() > 0) {
void PthreadAsyncIO::sync_read_events()
{
while (this->read_fut.size() > 0)
{
auto front = std::move(this->read_fut.front());
this->read_fut.pop_front();

auto fut(std::move(std::get<0>(front)));
fut.wait();

auto callback = std::get<1>(front);
if (callback != nullptr) {
if (callback != nullptr)
{
callback();
}
}
}

void PthreadAsyncIO::synchronize() {
void PthreadAsyncIO::synchronize()
{
this->get_event(WAIT);
}

void PthreadAsyncIO::register_file(int fd) {}

void PthreadAsyncIO::register_h2d(unsigned int num_tensors) {
this->h2d_in_progress.store(num_tensors); // register tensors to write for this run
void PthreadAsyncIO::register_h2d(unsigned int num_tensors)
{
this->h2d_in_progress.store(num_tensors); // register tensors to write for this run
}

void PthreadAsyncIO::sync_h2d() {
void PthreadAsyncIO::sync_h2d()
{
std::unique_lock<std::mutex> lock(this->mtx);
this->cv.wait(lock, [this] { return this->h2d_in_progress == 0; }); // block until all in-progress h2d are completed
this->cv.wait(lock, [this]
{ return this->h2d_in_progress == 0; }); // block until all in-progress h2d are completed
}

void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned)
{
auto stream = c10::cuda::getCurrentCUDAStream();
if (!t.is_cuda()) {
this->h2d_in_progress.fetch_sub(1); // already moved to cpu
if (this->h2d_in_progress.load() == 0) { // notify when all h2d are completed and safe to optimizer.step()
if (!t.is_cuda())
{
this->h2d_in_progress.fetch_sub(1); // already moved to cpu
if (this->h2d_in_progress.load() == 0)
{ // notify when all h2d are completed and safe to optimizer.step()
std::lock_guard<std::mutex> lock(this->mtx);
cv.notify_one();
}
}
auto fut = this->pool.submit_task(
[this, fd, t, offset, pinned, stream] {
[this, fd, t, offset, pinned, stream]
{
torch::Tensor cpu_tensor;
if (t.is_cuda()) {
at::cuda::CUDAStreamGuard guard(stream); // https://pytorch.org/cppdocs/notes/tensor_cuda_stream.html
if (pinned.has_value()) {
if (t.is_cuda())
{
at::cuda::CUDAStreamGuard guard(stream); // https://pytorch.org/cppdocs/notes/tensor_cuda_stream.html
if (pinned.has_value())
{
pinned.value().copy_(t, /*non_blocking*/ false);
cpu_tensor = pinned.value();
} else {
cpu_tensor = t.to(t.options().device(c10::DeviceType::CPU), /*non_blocking*/ false, /*copy*/ false); // modified from torch::Tensor::cpu()
}
else
{
cpu_tensor = t.to(t.options().device(c10::DeviceType::CPU), /*non_blocking*/ false, /*copy*/ false); // modified from torch::Tensor::cpu()
}
this->h2d_in_progress.fetch_sub(1);
if (this->h2d_in_progress.load() == 0) { // notify when all h2d are completed and safe to optimizer.step()
if (this->h2d_in_progress.load() == 0)
{ // notify when all h2d are completed and safe to optimizer.step()
std::lock_guard<std::mutex> lock(this->mtx);
cv.notify_one();
}
} else {
}
else
{
cpu_tensor = t;
}
void *buf = cpu_tensor.data_ptr();
size_t n_bytes = cpu_tensor.numel() * cpu_tensor.element_size();
return pwrite(fd, buf, n_bytes, offset);
}
);
});
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
}
2 changes: 1 addition & 1 deletion tensornvme/async_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AsyncFileWriter:
def __init__(self, path: str, n_entries: int = 16, backend=None) -> None:
# this still takes ram buffer, which may lead to OOM
# self.f = open(path, "wb", buffering=0)
self.fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_DIRECT, mode=0o664)
self.fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_SYNC, mode=0o664)
if backend is not None:
self.io = AsyncFileWriterC(self.fd, n_entries, backend=backend)
else:
Expand Down