Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed training only on CPUs #5787

Open
1 task done
zmasih opened this issue Jan 17, 2025 · 9 comments
Open
1 task done

distributed training only on CPUs #5787

zmasih opened this issue Jan 17, 2025 · 9 comments
Assignees
Labels
question Further information is requested

Comments

@zmasih
Copy link

zmasih commented Jan 17, 2025

Describe the question.

Hello.
I need to use DALI for distributed training only on CPUs. The system where I'm running my benchmark does not have any GPUs. I've tried 'EfficientDet' of the DALI repo, but it works with distributed strategies either on GPUs or on a single CPU. Would you guide me?

Check for duplicates

  • I have searched the open bugs/issues and have found no duplicates for this bug report
@zmasih zmasih added the question Further information is requested label Jan 17, 2025
@JanuszL
Copy link
Contributor

JanuszL commented Jan 17, 2025

Hi @zmasih,

Can you tell me how do you run the example and what kind of error do you observe?

I cannot rule out that the code itself is not adjusted to run only on the CPU, we added a CPU variant of the pipeline but not the model itself.

@zmasih
Copy link
Author

zmasih commented Jan 17, 2025

I'm running
python train.py --pipeline dali_cpu --epochs 1 --input_type tfrecord --train_file_pattern './tfrecords/train/*.tfrecord' --batch_size 16 --train_steps 1
For
num_devices = 1
works well (no strategy is used), but for more than 1 device,
ValueError: dali_cpu pipeline is not compatible with multi_gpu mode :<
I understand the problem, but don't know how to change multi_gpu mode.

@JanuszL
Copy link
Contributor

JanuszL commented Jan 17, 2025

Hi @zmasih,

Indeed, the code is an example that was not prepared for more than one node in mind for the CPU.
In your case, if you want to run the training on a single server, I think that one device is more than enough, as DALI and the TF will use multiple cores under the hood.
If you want to use multiple nodes, you can check the RN50 example using Horovod, which should scale better for multiple nodes.

@jantonguirao jantonguirao assigned JanuszL and unassigned jantonguirao Jan 17, 2025
@zmasih
Copy link
Author

zmasih commented Jan 17, 2025

@JanuszL Thank you for your answer.

So, you are saying that if more than one device is available, with no explicit request, when running the code, DALI will use multiple CPU cores to load, decode, and preprocess data concurrently?

And as a quick recheck before starting RN50, You confirm that on my system where there is no GPU, I can use distributed training on multiple nodes with this use case.

@JanuszL
Copy link
Contributor

JanuszL commented Jan 17, 2025

So, you are saying that if more than one device is available, with no explicit request, when running the code, DALI will use multiple CPU cores to load, decode, and preprocess data concurrently?

Yes, the only thing you can adjust is the number of CPU threads that DALI uses (num_threads argument).

You confirm that on my system where there is no GPU, I can use distributed training on multiple nodes with this use case.

I believe that the Horovod approach should work in general with DALI on CPU however I cannot say if the examples we have will work, especially the EfficientDet which uses the native TF distributed strategy.

@zmasih
Copy link
Author

zmasih commented Jan 20, 2025

I've tried
python train.py --pipeline dali_cpu --epochs 1 --input_type tfrecord --train_file_pattern './tfrecords/train/*.tfrecord' --batch_size 16 --train_steps 1
on a system with no GPU.
I set device_id=None and cpu_only=True in
docs/examples/use_cases/tensorflow/efficientdet/pipeline/dali/efficientdet_pipeline.py

But I still get the following error:

dlopen libcuda.so failed. Please install GPU driver.DALI daliCreatePipeline3( pipeline_handle, pipeline_def_.pipeline.c_str(), pipeline_def_.pipeline.length(), pipeline_def_.batch_size, pipeline_def_.num_threads, pipeline_def_.device_id, flags, pipeline_def_.prefetch_queue_depth, pipeline_def_.cpu_prefetch_queue_depth, pipeline_def_.gpu_prefetch_queue_depth, pipeline_def_.enable_memory_stats) failed: Assert on "device_id == CPU_ONLY_DEVICE_ID || cuInitChecked()" failed: You are trying to create a GPU DALI pipeline, while CUDA is not available. Please install CUDA or set `device_id = None` in Pipeline constructor. If running inside Docker container, you may need to use  `--gpus` option.
2025-01-20 16:27:34.398126: F dali_dataset_op.cc:1019] Non-OK-status: InitPipeline(&pipeline_handle)
Status: INTERNAL: DALI daliCreatePipeline3( pipeline_handle, pipeline_def_.pipeline.c_str(), pipeline_def_.pipeline.length(), pipeline_def_.batch_size, pipeline_def_.num_threads, pipeline_def_.device_id, flags, pipeline_def_.prefetch_queue_depth, pipeline_def_.cpu_prefetch_queue_depth, pipeline_def_.gpu_prefetch_queue_depth, pipeline_def_.enable_memory_stats) failed: Assert on "device_id == CPU_ONLY_DEVICE_ID || cuInitChecked()" failed: You are trying to create a GPU DALI pipeline, while CUDA is not available. Please install CUDA or set `device_id = None` in Pipeline constructor. If running inside Docker container, you may need to use  `--gpus` option.
Aborted (core dumped)

Can you please guide me? it worked for me on a system with Cuda, even for --pipeline dali_cpu.

@JanuszL
Copy link
Contributor

JanuszL commented Jan 20, 2025

Hi @zmasih,

As I mentioned, the example is not prepared to run without the GPU even if the pipeline can run on the CPU. In this case, each DALI pipeline is assigned to a device (GPU) based on the TF distributed strategy. In this case, once the device id is provided, DALI tries to initialize CUDA.

What you can do is to check if providing None for the device id in L74 of docs/examples/use_cases/tensorflow/efficientdet/pipeline/dali/efficientdet_pipeline.py helps.

@zmasih
Copy link
Author

zmasih commented Jan 20, 2025

Thank you @JanuszL
That also didn't help. Since DALI is primarily designed for GPUs, I haven’t found much relevant guidance online.
Would you have any additional resources or suggestions that might help in implementing distributed training with DALI on CPU systems?

@JanuszL
Copy link
Contributor

JanuszL commented Jan 20, 2025

Hi @zmasih,

You can start with this toy example:

import os.path

test_data_root = os.environ["DALI_EXTRA_PATH"]

# MXNet RecordIO
base = os.path.join(test_data_root, "db", "recordio")

idx_files = [base + "/train.idx"]
rec_files = [base + "/train.rec"]

BATCH_SIZE = 32
ITERATIONS = 32
BURNIN_STEPS = 16

from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
import time

from tensorflow.compat.v1 import GPUOptions
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import Session
from tensorflow.compat.v1 import placeholder

tf.compat.v1.disable_eager_execution()
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt

%matplotlib inline


def show_images(image_batch, nb_images):
    columns = 4
    rows = (nb_images + 1) // (columns)
    fig = plt.figure(figsize=(32, (32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(nb_images):
        plt.subplot(gs[j])
        plt.axis("off")
        img = image_batch[0][j].transpose((1, 2, 0)) + 128
        plt.imshow(img.astype("uint8"))


@pipeline_def(batch_size=BATCH_SIZE, num_threads=4)
def rn50_pipeline(device):
    jpegs, labels = fn.readers.mxnet(
        path=rec_files, index_path=idx_files, name="Reader"
    )
    images = fn.decoders.image(
        jpegs, device="mixed" if device == "gpu" else "cpu"
    )
    images = fn.resize(
        images,
        resize_shorter=fn.random.uniform(range=(256.0, 480.0)),
        interp_type=types.INTERP_LINEAR,
    )
    images = fn.crop_mirror_normalize(
        images,
        crop_pos_x=fn.random.uniform(range=(0.0, 1.0)),
        crop_pos_y=fn.random.uniform(range=(0.0, 1.0)),
        dtype=types.FLOAT,
        crop=(224, 224),
        mean=[128.0, 128.0, 128.0],
        std=[1.0, 1.0, 1.0],
    )
    images = fn.cast(images, dtype=types.INT32)

    if device == "gpu":
        labels = labels.gpu()

    return images, labels


cpu_pipe = rn50_pipeline(device="cpu", device_id=None)

daliop = dali_tf.DALIIterator()

images_cpu = []
labels_cpu = []
with tf.device("/cpu"):
    image_cpu, label_cpu = daliop(
        pipeline=cpu_pipe,
        shapes=[(BATCH_SIZE, 3, 224, 224), ()],
        dtypes=[tf.int32, tf.float32],
    )

    images_cpu.append(image_cpu)
    labels_cpu.append(label_cpu)

with Session() as sess:
    all_img_per_sec = []
    total_batch_size = BATCH_SIZE

    for i in range(ITERATIONS):
        start_time = time.time()

        # The actual run with our dali_tf tensors
        res_cpu = sess.run([images_cpu, labels_cpu])

        elapsed_time = time.time() - start_time
        img_per_sec = total_batch_size / elapsed_time
        if i > BURNIN_STEPS:
            all_img_per_sec.append(img_per_sec)
            print("\t%7.1f img/s" % img_per_sec)

    print(
        "Total average %7.1f img/s"
        % (sum(all_img_per_sec) / len(all_img_per_sec))
    )
show_images(res_cpu[0], 8)

to run DALI on the CPU (tested inside the jupyter notebook).
With the distributed training you need to make sure that the DALI reader shard_id is set accordingly for a given node and num_shards is equal to the world size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants