Tutorial: Scalable and Distributed ML Workflows with DVC and Ray (Part 1)

Training Models at scale require advanced tools that manage complexity while ensuring efficiency. This tutorial introduces you to integrating DVC with Ray, turning them into your go-to toolkit for creating automated, scalable, and distributed ML pipelines.

  • Mikhail Rozhkov
    +1
    Dave Berenbaum
  • March 12, 202415 min read
Hero Picture

Ray + DVC integration example

Training models at the scale of the Gemini or GPT-4 models requires advanced tools that manage complexity while ensuring efficiency. This tutorial explores how Data Version Control (DVC) can be a game-changer for ambitious projects. DVC simplifies AI development by automating pipelines, managing versions, and tracking experiments while embracing GitOps for reproducibility. It excels in both local and cloud environments for traditional ML workflows. However, the rise of Generative AI and complex deep learning projects demands scalable, distributed training solutions.

This tutorial is divided into two parts. Part 1 sets the foundation for scalable and efficient machine learning workflows by leveraging Ray’s distributed computing capabilities and DVC’s data version control.

In Part 2, we extend the solution to a Ray Cluster on AWS, demonstrating how to adapt the setup for cloud-based distributed computing. This involves configuring AWS resources, deploying Ray clusters in the cloud, and running DVC-managed pipelines at scale.

This guide is tailored for ML Engineers and Team Leads in AI projects who aim to speed up training, optimize resources, and ensure reproducibility across distributed environments. I am looking forward to hearing your feedback and improvements! 🙌

We would like to express our gratitude to Andreas Schuh from HeartFlow for his contribution to this solution and for providing ideas and feedback for the blog posts. 🤝

Table Of Contents

Why DVC and Ray?

DVC is an open-source tool that brings GitOps and reproducibility to data management, ML experiments, and model development. It connects versioned data sources and code with pipelines, tracks experiments, and registers models — all based on GitOps principles.

Ray is an open-source unified computing framework that makes scaling AI and Python workloads easy — from reinforcement learning to deep learning to tuning and model serving. Ray makes it a breeze to scale your compute-intensive tasks from a single machine to a massive cluster without losing your mind.

DVC + Ray for distributed ML

DVC and Ray make your ML projects more manageable and prepare them to tackle the challenges of tomorrow’s AI-driven landscape. Let’s explore this dynamic duo and unlock new potentials in your MLOps journey!

💡 Want to learn more about DVC?

Join our online course about DVC: Iterative Tools for Data Scientists & Analysts course!

Tutorial Scope

This tutorial will guide users through creating automated, scalable, and distributed ML pipelines using DVC (Data Version Control) and Ray. We start with configuring the Ray Cluster for local and cloud environments. Then, we discuss the challenges of running DVC in distributed environments. Then, we’ll run a few examples of using DVC and Ray. By the end of the tutorial, you will be able to design, run, and manage ML pipelines distributed over multiple nodes and trackable through version control.

For DVC users, this tutorial offers several advantages:

  • Bring Distributed Computing Efficiency to DVC projects
  • Easy use of AWS Cloud for Development and Production workflows
  • Enable automated pipelines and data versioning in ML projects with Ray

For Ray users, this tutorial aims to highlight the benefits of integrating DVC:

  • Enhance Model Training Reproducibility with DVC’s data versioning capabilities
  • Streamline ML Pipeline Management through DVC’s structured approach
  • Facilitate Efficient Collaboration among teams by leveraging DVC for shared data and model management

High-level solution design

Let’s overview the high-level design of our target solution with DVC and Ray.

  1. Users can manage Ray Cluster and run DVC pipelines from a “local” environment.
  2. Ray distributes workloads across multiple workers and can auto-scale cluster nodes.
  3. During the training, DVCLive logs live updates of metrics and parameters to DVC Studio.
  4. DVC utilizes S3 to sync states between a Worker and Head nodes.
  5. DVC uses remote storage (AWS S3) to manage data and model artifacts.
  6. Users commit the results of the experiment to Git and DVC Remote Storage.

Solution Design High-Level Solution Design

Prerequisites

We expect that you:

  • Have some experience with Machine Learning or Data Engineering pipelines
  • Are familiar with DVC

To follow this tutorial, you’ll need the following tools:

  • Git
  • Python 3.11 or above
  • AWS CLI (if you want to run pipelines in AWS)

👩‍💻 Installation

Creating an ML pipeline that runs distributed tasks is a powerful way to manage and scale your machine learning workflows. With DVC, we can efficiently orchestrate our pipeline stages and handle experiment outputs.

To clone the example repository, you can follow these steps:

git clone https://github.com/iterative/tutorial-mnist-dvc-ray.git
cd tutorial-mnist-dvc-ray

Install Python dependencies:

python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
export PYTHONPATH=$PWD

⭐ Get Started with Ray

1 - Overview of the Ray Framework

Ray is a framework for scaling AI and Python applications. For AI and ML applications, Ray helps to scale jobs without needing infrastructure expertise:

  • Efficiently parallelize and distribute ML workloads across multiple nodes and GPUs.
  • Leverage the ML ecosystem with native and extensible integrations.

Stack of Ray libraries - a unified toolkit for ML workloads Stack of Ray libraries - A Unified Toolkit For ML Workloads (Ray Docs)

In this tutorial, we work with Ray Clusters and Ray AI Libraries (Ray Tune and Ray Train). Ray Cluster is a set of Worker nodes connected to a common Ray Head node.

  • The Head node serves as the central coordination point for the Ray cluster. It manages the cluster’s metadata, maintains the cluster state, and handles task scheduling and management.
  • Worker nodes are the computational workhorses of the Ray cluster. They are responsible for executing tasks and running computations for applications.

Two nodes Ray Cluster A Ray cluster with two worker nodes. Each node runs Ray helper processes to facilitate distributed scheduling and memory management. The head node runs additional control processes (highlighted in blue). Source: Ray Docs

Ray clusters can be fixed-size or autoscale up and down according to the resources requested by applications running on the cluster.

Ray Tune is a Python Library that automates the hyperparameter tuning process across distributed resources. By integrating Ray Tune into the experiment workflow, we can evaluate numerous hyperparameter combinations in parallel, speeding up the search for optimal model configurations.

Distributed tuning with Ray Distributed tuning with distributed training per trial. Source: Ray Docs

Ray Train creates a setup to scale model training code from a single machine to a cluster of machines in the cloud and abstracts away the complexities of distributed computing. At a high level of abstraction, it distributes and runs training jobs among worker nodes.

Ray Train Overview Ray Train Overview. Source: Ray Docs

2 - Start a Ray Cluster

💡 Navigate to the main branch in the repository

To start a Ray Cluster, first initiate the Ray head node. The head node is the primary node in the Ray cluster that manages the worker nodes. Since this is a local setup, your machine will act as both the Head and Worker nodes. Use the following command:

ray start --head

This command starts the Ray cluster with your machine acting as the head node.

To monitor and debug Ray, view the dashboard at http://127.0.0.1:8265/.

Ray Dashboard Ray Dashboard - Cluster Nodes

💡 Multi-node Ray clusters are only supported on Linux. You may deploy Windows and OSX clusters for development by setting the environment variable RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1. Source: Ray Clusters Overview.

3 - Run a test script on the Ray Cluster

You can run a simple test script to ensure your local Ray cluster works correctly. In your project directory, create a file named hello_cluster.py inside the src/test_scripts directory. Add a script to connect to the Ray cluster and print a message. Here’s an example script:

import ray

@ray.remote
def hello_world():
    return “Hello Ray cluster”

# Automatically connect to the running Ray cluster.
ray.init()
print(ray.get(hello_world.remote()))

Execute the script using Python. Open your terminal and run:

python src/test_scripts/hello_cluster.py

You should see an output similar to this:

2023-11-14 12:11:17,363 INFO worker.py:1489 -- Connecting to existing Ray cluster at address: 192.168.100.19:6379...
2023-11-14 12:11:17,370 INFO worker.py:1664 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265
Hello Ray cluster

This output indicates that your script has successfully connected to the local Ray cluster and executed the print statement.

🏃‍♂️ Run DVC Pipeline on a Ray Cluster

You have a single-node Ray Cluster at this step on your local machine. Let’s start with the DVC pipeline setup.

Goals for this section:

  • Design a Solution for DVC + Ray.
  • Create a DVC pipeline with two stages: tune and train.
  • Modify DVCLive to sync metrics and parameters with DVC Studio.

1 - Design Solution for DVC + Ray

The technical design calls for a structure where ML experiment scripts, managed by DVC, invoke Ray for their computation needs. DVC is the orchestrator, invoking the appropriate Ray functions for distributed processing.

Design POC Solution for DVC + Ray (local) Design POC Solution for DVC + Ray (local)

This diagram outlines the integration of DVC (Data Version Control) with a Ray cluster for running ML experiments in a distributed manner:

  1. DVC initiates the process by running a stage script. The dvc.yaml pipeline definition is the blueprint for the ML workflow, defining stages that utilize Ray for hyperparameter tuning and subsequent training stages.
  2. Ray Job Submission: The stage script (e.g., src/stages/tune.py) starts a Ray application that submits computation jobs to Ray. The src/stages/tune.py script utilizes Ray Tune’s Tuner class to define and run the hyperparameter tuning trials.
  3. Ray Cluster contains a single Head Node where the actual computation occurs. (Note: In the production cluster, Ray runs the jobs distributed across multiple worker nodes). Ray saves results for each job (trial) to a local directory in a worker node (outside the DVC project repo).
  4. After all jobs complete, the stage script retrieves results from Ray’s trial directories to the DVC project repo (if needed).
  5. DVC manages the outputs of the pipeline, ensuring reproducibility and traceability.

The result is a robust framework for conducting and managing ML experiments that are scalable, reproducible, and efficiently optimized. This framework not only streamlines the experimentation process but also simplifies the transition of models from development to production.

2 - Create a DVC pipeline

In this tutorial, the dvc.yaml file contains only two stages in the ML pipeline: tune and train.

DVC pipeline DVC pipeline configuration in dvc.yaml with tune and train stages, and plots sections

Tune Stage

This initial stage is responsible for hyperparameter tuning. It uses Ray to distribute the computation involved in this process. The stage executes a Python script tune.py that optimizes hyperparameters using the Ray Tune. The output of this stage is best_params.yaml, which contains the best hyperparameters found during the tuning process.

tune:
  cmd: python src/stages/tune.py --config params.yaml
  params:
    - tune
  outs:
    - ${tune.results_dir}/best_params.yaml:
        cache: false
        persist: true

Use two specific configuration parameters for the best_params.yaml output:

  • Set cache: false to instruct DVC not to cache the file but version it with Git.
  • Set persist: true to instruct DVC not to remove the file before reproducing the stage. It’s useful for stage dependencies when you work in an unstable environment (or debugging), and the stage script can fail for any reason. In this example, even if the tune stage fails, you can run the train stage using best_params.yaml from the previous run.

Train Stage

The Train Stage runs distributed computation via Ray. This stage depends on best_params.yaml generated by the tune stage to access the optimal hyperparameters for training the model. The train stage is invoked by the train.py script, which will train the model based on the tuned parameters.

train:
  cmd: python src/stages/train.py --config params.yaml
  params:
    - train
  deps:
    - ${tune.results_dir}/best_params.yaml
  outs:
    - ${train.results_dir}/model.pth

The trained model is saved as model.pth, with the path again parameterized to allow flexibility in the output location. The output model is automatically cached and versioned with DVC.

3 - Run DVC pipelines on Ray Cluster

To execute your automated and distributed ML pipeline with DVC, perform the following steps:

  • Set the PYTHONPATH environment variable to ensure Python scripts can access modules within your project’s directory by setting the PYTHONPATH environment variable.
  • Run DVC pipeline with dvc exp run command.
export PYTHONPATH=$PWD
dvc exp run

This will start the pipeline, running the tune and train stages as defined in your dvc.yaml file, utilizing distributed computation with Ray.

You may see live updates of metrics and plots in DVC Studio and DVC Extension for VS Code. DVC can generate and render plots based on your project’s data. Metrics and plots logged with DVCLive can be visualized in DVC Studio and DVC Extension for VS Code.

A few benefits of tracking and visualizing metrics and plots with DVC (see docs):

  • Enhanced Experiment Tracking: Compare metrics, parameters, version of data, and plots between experiments in a live mode (docs: Visualize and Compare experiments).
  • Customize Visualization: Define visualization template, select data to be visualized and titles interactively, before or after the experiment is complete (docs: Defining plots).
  • Share & Version Control for Metrics: You can send live metrics and plots to DVC Studiopush completed experiments (including data, models, and code), and convert an experiment into a persistent branch or commit in your Git repo (docs Sharing Experiments).

Experiment tracking Experiment tracking with DVC Studio and DVC Extension for VS Code

💡 Note: Sometimes, when you run dvc exp run with a local Ray Cluster, the process may get stuck with Connecting to existing Ray cluster at address: 192.168.100.19:6379... message due to a ConnectionError in Ray. In this case, open a new terminal session, export PYTHONPATH, and run the dvc exp run command there.

💬 Discuss the Solution Design

This section above explains a simple example of running DVC and Ray together. It’s not a production setup. But it’s a good start for developing and debugging the DVC pipeline with Ray.

Let’s think about what decisions we made and discuss some details:

  1. Use DVC to run scripts calling Ray API.
  2. Persist DVC stage outputs to keep them available for downstream stages in case of failure.
  3. Use DVCLive to track metrics only on a worker with a rank of 0.
  4. Propagate DVC environment variables to a worker node using TorchTrainer train_loop_config.
  5. Copy the model.pth file from the Ray Trial folder to the DVC project repository.

☝️ Use DVC to run scripts calling Ray API

Ray framework provides a rich Python API for distributed data processing, model tuning, and training. Wrapping Ray scripts into callable Python modules simplifies using DVC. Therefore, you get two benefits:

  • Get scalability and distributed training with Ray
  • Get reproducibility and versioning with DVC

A template of the dvc.yaml for DVC + Ray:

stages:

  first_stage:
    cmd: python first_script_with_ray.py
    ...

  next_stage:
    cmd: python second_script_with_ray.py
    ...

☝️ Persist DVC stage outputs to keep them available for downstream stages in case of failure

Set persist: true to instruct DVC not to remove the file before reproducing the stage. It’s useful for stage dependencies when you work in an unstable environment (or debugging), and the stage script might fail.

stages:
  first_stage:
    cmd: python first_script_with_ray.py
    outs:
      - stage_output.file:
          persist: true

☝️ Use DVCLive to track live metrics updates with DVC Studio and DVC Extension for VS Code

Ray Train lets you use native experiment tracking libraries inside the train_func function. DVCLive is a highly flexible and lightweight library that simplifies experiment tracking in DVC projects.

from dvclive import Live

with Live() as live:
    live.log_metric(metric_name, value)

This solution uses log metrics with Live() inside the train_func_per_worker() function.

One significant distinction between distributed and non-distributed training lies in the parallel execution of multiple processes in distributed training setups, which may yield identical results under specific configurations. When all processes communicate results to the tracking backend, there’s a risk of receiving duplicate entries (check Ray docs for details).

Therefore, a few adjustments should be made to DVCLive.

  1. Use DVCLive to track metrics only on a worker with a rank of 0.
  2. Use the DVC_ROOT variable to create the Live(dir=...) object. DVC automatically sets the value for the DVC_ROOT variable to the directory of your DVC repository and ensures Ray writes metrics inside the repo (docs).

As a result, the DVCLive usage code inside the train_func_per_worker() function looks like the example below.

# train.py

def train_func_per_worker(config: Dict):

    # Initialize DVC Live
    live = None
    rank = ray.train.get_context().get_world_rank()

    # Create a Live object on the rank 0 worker
    if rank == 0:
      live = Live(
          dir=os.path.join(os.environ.get("DVC_ROOT",""), "results/dvclive"),
      )

    for epoch in range(epochs):
      # ...epoch training

      # Log metrics with print()
      if live:
          live.log_metric("loss", test_loss)
          live.log_metric("accuracy", accuracy)
          live.next_step()

Utilizing DVCLive in Python code for logging metrics and plots automatically generates the necessary configurations for plots within the dvc.yaml file. Below is an example configuration for metrics and plots:

metrics:
  - results/dvclive/metrics.json
plots:
  - accuracy:
      x: step
      y:
        results/dvclive/plots/metrics/accuracy.tsv: accuracy
      title: Accuracy
      x_label: Step
      y_label: Accuracy
  - loss:
      template: simple
      x: step
      y:
        results/dvclive/plots/metrics/loss.tsv: loss
      title: Loss
      x_label: Step
      y_label: Accuracy
  - results/tune/plots/images

The train stage logs metrics and plots to results/dvclive. Datapoints for metrics and plots are saved in files and visualized later in DVC Studio and VS Code.
Metrics and plot Metrics plot generated by the tune stage

The tune stage logs a mean_accuracy_plot.png file to visualize metrics for tuning trials.

Metrics plot Metrics plot generated by the tune stage

☝️ Propagate DVC environment variables to Worker nodes

DVC environment variables are necessary for every Ray worker because they provide essential information and configurations for DVCLive, facilitating experiment tracking. These variables include:

  1. DVC_STUDIO_REPO_URL: Repository URL where DVC stores versioned data.
  2. DVC_STUDIO_TOKEN: Authentication token for secure access to DVC Studio.
  3. DVC_STUDIO_URL: Web interface URL for managing DVC projects.
  4. DVC_EXP_BASELINE_REV: Baseline revision for comparing experiment results.
  5. DVC_EXP_NAME: Descriptive identifier for the experiment.
  6. DVC_ROOT: Root directory of the DVC project on the filesystem.

💡 Note: All environment variables above are set by DVC automatically when running a pipeline.

You don’t need to care about DVC environment variables when running DVC in a non-distributed environment. However, running it in Ray Cluster requires setting up on every worker. In this solution, DVC environment variables are passed via RuntimeEnv to specify a runtime environment for the whole job.

Set up Environment Variables Set up DVC Environment Variables

The code snippet below demonstrates an approach to managing DVC environment variables within a TorchTrainer setup.

def train_func_per_worker(config: Dict):
    #...
    if rank == 0:
        live = Live(
            dir=os.path.join(os.environ.get("DVC_ROOT",""), "results/dvclive"),
        )

def train(params: dict) -> None:
    #...
    trainer = TorchTrainer(
        train_loop_per_worker=train_func_per_worker,
        train_loop_config=train_config,
        ...
    )

if __name__ == "__main__":
    #...
    # [1] Propogate DVC environment variables from Head Node to Workers
    # =============================================
    DVC_ENV_VARS = {k: v for k, v in os.environ.items() if  k.startswith("DVC")}
    ray.init(runtime_env=RuntimeEnv(env_vars=DVC_ENV_VARS))

    train(params)
  • To ensure that DVC environment variables are accessible within the training loop across all worker nodes, RuntimeEnv propagates these variables from the head node to the workers.

☝️ Copy the model.pth file from the Ray Trial folder to the DVC project repository

Upon completing the training process, the model.pth file is saved in the Ray Trial folder. Therefore, it’s copied to the DVC project repository (as shown in the code example above).

This ensures that the trained model file is appropriately stored within the DVC-managed project structure, facilitating version control and reproducibility.

🎨 Summing Up: DVC + Ray Integration

The DVC + Ray integration presents a comprehensive solution to the challenges of running machine learning experiments at scale. By addressing specific issues related to auto-scaling, execution optimization, live metrics tracking, and data synchronization, this setup ensures that machine learning teams can focus on innovation and experimentation backed by a robust, scalable, and efficient infrastructure.

In Part 1 of the tutorial, we explored the basics of setting up and integrating DVC with Ray for distributed machine learning workflows. We covered the following key topics:

  • Introduction to Ray: We discussed Ray’s capabilities for scaling AI and Python applications, focusing on its ability to parallelize and distribute ML workloads across multiple nodes easily.
  • Ray Clusters: The architecture of Ray clusters was explained, highlighting the roles of head and worker nodes in managing and executing tasks.
  • Ray Tune and Ray Train: We introduced Ray Tune for hyperparameter optimization and Ray Train for scaling model training code, emphasizing their integration into ML workflows.
  • Local Ray Cluster Setup: Step-by-step instructions were provided for starting a Ray Cluster locally, showcasing how to test the setup with a simple script.

Key Takeaways

The key takeaway from Part 1 is the foundation it sets for scalable and efficient machine learning workflows. By leveraging Ray’s distributed computing capabilities and DVC’s data version control, we establish a robust framework for managing complex ML experiments. This combination enhances scalability, reproducibility, and collaboration in ML projects.

Looking Ahead to Part 2

In Part 2 of the tutorial, we will extend the solution to a Ray Cluster on AWS, demonstrating how to adapt the setup for cloud-based distributed computing. This will involve configuring AWS resources, deploying Ray clusters in the cloud, and running DVC-managed pipelines at scale. The focus will shift towards managing the increased complexity and leveraging cloud infrastructure to maximize the efficiency and performance of ML experiments.

Stay tuned for detailed instructions on deploying and managing cloud-based Ray clusters with DVC as we take the scalability and efficiency of ML workflows to the next level.

💡 Did you find this tutorial interesting? Please leave your comments and share your experience with DVC and Ray! Join us on Discord 🙌

References

Back to blog