Custom Clusters#
Similar to custom dataset and model classes, you can register a class for your own cluster so you can launch Oumi jobs on custom clusters that are not supported out of the box.
This guide is specifically geared towards individuals who have access to a compute cluster that’s not hosted on a common cloud provider (e.g. University/personal compute clusters).
The Oumi Launcher Hierarchy#
Preface#
Before diving into this tutorial, lets discuss the hierarchy of the Oumi Launcher. At this point, it’s worth reading through our tutorial on Deploying a Job to better understand the end-to-end flow of the launcher. Already read it? Great!
Overview#
At a high level, the Oumi Launcher is composed of 3 tiers of objects: Clouds
, Clusters
, and Clients
. The Launcher holds an instance of each unique Cloud
. These Clouds
, in turn, are responsible for creating compute Clusters
. And Clusters
coordinate running jobs. All communication with remote APIs happens via the Client
.
Clouds#
A Cloud class must implement the BaseCloud
abstract class. The Launcher will only create one instance of each Cloud, so it’s important that a single Cloud object is capable of turning up and down multiple clusters.
You can find several implementations of Clouds here.
Clusters#
A Cluster class must implement the BaseCluster
abstract class. A cluster represents a single instance of hardware. For custom clusters (such as having access to a super computer that can only run one job at a time), it may be the case that you only need one cluster to represent your hardware setup.
You can find several implementations of Clusters here.
Clients#
Clients are a completely optional but highly encouraged class. Clients should encapsulate all logic that calls remote APIs related to your cloud. While this logic could be encapsulated with your Cluster and Cloud classes, having a dedicated class for this purpose greatly simplifies your Cloud and Cluster logic.
You can find several implementations of Clients here.
Creating a CustomClient Class#
Let’s get started by creating a client for our new cloud, CustomCloud
. Let’s create a simple client that randomly sets the state of the job on submission. It also supports canceling jobs, and turning down clusters:
import random
from enum import Enum
from typing import Optional
from oumi.core.configs import JobConfig
from oumi.core.launcher import JobStatus
class _JobState(Enum):
"""An enumeration of the possible states of a job."""
QUEUED = "QUEUED"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELED = "CANCELED"
class CustomClient:
"""A client for running jobs locally in a subprocess."""
def __init__(self):
"""Initializes a new instance of the CustomClient class."""
self._jobs = []
def submit_job(self, job: JobConfig) -> JobStatus:
"""Pretends to run the specified job on this cluster."""
job_id = str(len(self._jobs))
name = job.name if job.name else job_id
# Pick a random status
status = random.choice([state for state in _JobState])
job_status = JobStatus(
name=name,
id=job_id,
status=status.value,
cluster="",
metadata="",
done=False,
)
self._jobs.append(job_status)
return job_status
def list_jobs(self) -> list[JobStatus]:
"""Returns a list of job statuses."""
return self._jobs
def get_job(self, job_id: str) -> Optional[JobStatus]:
"""Gets the specified job's status.
Args:
job_id: The ID of the job to get.
Returns:
The job status if found, None otherwise.
"""
job_list = self.list_jobs()
for job in job_list:
if job.id == job_id:
return job
return None
def cancel(self, job_id) -> Optional[JobStatus]:
"""Cancels the specified job.
Args:
job_id: The ID of the job to cancel.
Returns:
The job status if found, None otherwise.
"""
int_id = int(job_id)
if int_id > len(self._jobs):
return None
job_status = self._jobs[int_id]
job_status.status = _JobState.CANCELED.value
return job_status
def turndown_cluster(self, cluster_name: str):
"""Turns down the cluster."""
print(f"Turning down cluster {cluster_name}...")
pass
Creating a CustomCluster Class#
Now that we have a client that talks to our API, we can use the Client to build a Cluster!
from typing import Any, Optional
from oumi.core.launcher import BaseCluster
class CustomCluster(BaseCluster):
"""A custom cluster implementation."""
def __init__(self, name: str, client: CustomClient) -> None:
"""Initializes a new instance of the CustomCluster class."""
self._name = name
self._client = client
def __eq__(self, other: Any) -> bool:
"""Checks if two LocalClusters are equal."""
if not isinstance(other, CustomCluster):
return False
return self.name() == other.name()
def name(self) -> str:
"""Gets the name of the cluster."""
return self._name
def get_job(self, job_id: str) -> Optional[JobStatus]:
"""Gets the jobs on this cluster if it exists, else returns None."""
for job in self.get_jobs():
if job.id == job_id:
return job
return None
def get_jobs(self) -> list[JobStatus]:
"""Lists the jobs on this cluster."""
jobs = self._client.list_jobs()
for job in jobs:
job.cluster = self._name
return jobs
def cancel_job(self, job_id: str) -> JobStatus:
"""Cancels the specified job on this cluster."""
self._client.cancel(job_id)
job = self.get_job(job_id)
if job is None:
raise RuntimeError(f"Job {job_id} not found.")
return job
def run_job(self, job: JobConfig) -> JobStatus:
"""Runs the specified job on this cluster.
Args:
job: The job to run.
Returns:
The job status.
"""
job_status = self._client.submit_job(job)
job_status.cluster = self._name
return job_status
def down(self) -> None:
"""Cancel all jobs and turn down the cluster."""
for job in self.get_jobs():
self.cancel_job(job.id)
self._client.turndown_cluster(self._name)
def stop(self) -> None:
"""Cancel all jobs and turn down the cluster."""
self.down()
Creating a CustomCloud Class#
Let’s create a CustomCloud to manage our clusters:
from oumi.core.launcher import BaseCloud
class CustomCloud(BaseCloud):
"""A resource pool for managing Local jobs."""
# The default cluster name. Used when no cluster name is provided.
_DEFAULT_CLUSTER = "custom"
def __init__(self):
"""Initializes a new instance of the LocalCloud class."""
# A mapping from cluster names to Local Cluster instances.
self._clusters = {}
def _get_or_create_cluster(self, name: str) -> CustomCluster:
"""Gets the cluster with the specified name, or creates one if it doesn't exist.
Args:
name: The name of the cluster.
Returns:
LocalCluster: The cluster instance.
"""
if name not in self._clusters:
self._clusters[name] = CustomCluster(name, CustomClient())
return self._clusters[name]
def up_cluster(self, job: JobConfig, name: Optional[str]) -> JobStatus:
"""Creates a cluster and starts the provided Job."""
# The default cluster.
cluster_name = name or self._DEFAULT_CLUSTER
cluster = self._get_or_create_cluster(cluster_name)
job_status = cluster.run_job(job)
if not job_status:
raise RuntimeError("Failed to start job.")
return job_status
def get_cluster(self, name) -> Optional[BaseCluster]:
"""Gets the cluster with the specified name, or None if not found."""
clusters = self.list_clusters()
for cluster in clusters:
if cluster.name() == name:
return cluster
return None
def list_clusters(self) -> list[BaseCluster]:
"""Lists the active clusters on this cloud."""
return list(self._clusters.values())
Now all that’s left to do is register your CustomCloud!
Registering Your CustomCloud#
By implementing the BaseCloud class, you are now ready to register your cloud with Oumi. First, let’s take a look at the clouds that are already registered:
import oumi.launcher as launcher
print(launcher.which_clouds())
You can register your cloud by implementing a builder method. This method must take no arguments and must return a new instance of your CustomCloud:
from oumi.core.registry import register_cloud_builder
@register_cloud_builder("custom")
def Local_cloud_builder() -> CustomCloud:
"""Builds a LocalCloud instance."""
return CustomCloud()
Let’s take another look at our registered clouds now:
print(launcher.which_clouds())
Great, our CustomCloud is there!
Using Your CustomCloud via the CLI#
‼️ Important ‼️ A few extra steps are needed to use your cloud from the CLI.
First, you need to create a requirements.txt
file.
This is a simple text file where each line contains the absolute filepath to any python
that interface with the Oumi registry. In this guide, that means any files we wrote that
contain a @register_cloud_builder
decorator.
Let’s say you created your CustomCloud class in a file saved at
/path/to/custom_cloud.py
, and your requirements
file at
/another/path/requirements.txt
.
Your requirements.txt
file should look like:
/path/to/custom_cloud.py
Now that you’ve created your requirements.txt
file, you simply need to set the
OUMI_EXTRA_DEPS_FILE
environment variable with the path of your requirements.txt
file and the Oumi CLI will automatically pick up your changes!
export OUMI_EXTRA_DEPS_FILE=/another/path/requirements.txt
You can verify that your cloud is now installed by running:
oumi launch which
Running a Job on Your Cloud#
Let’s take our new Cloud for a spin:
We can kick off a job:
oumi launch up --cluster first_cluster -c configs/recipes/smollm/sft/135m/quickstart_gcp_job.yaml --resources.cloud custom
oumi launch status
And now let’s turn down our cluster:
oumi launch down --cluster first_cluster
job = launcher.JobConfig(name="test")
job.resources.cloud = "custom"
first_cluster, job_status = launcher.up(job, "first_cluster")
print(job_status)
second_cluster, second_job_status = launcher.up(job, "second_cluster")
print(second_job_status)
print("Canceling the first job...")
print(launcher.cancel(job_status.id, job.resources.cloud, job_status.cluster))
And now let’s turn down our clusters:
for cluster in launcher.get_cloud("custom").list_clusters():
cluster.down()
print(f"Cluster {cluster.name()} is down. Listing jobs...")
print(cluster.get_jobs())