Source code for oumi.launcher.clusters.local_cluster

# Copyright 2025 - Oumi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
from copy import deepcopy
from typing import Any, Optional

from oumi.core.configs import JobConfig
from oumi.core.launcher import BaseCluster, JobStatus
from oumi.launcher.clients.local_client import LocalClient
from oumi.utils.logging import logger


def _validate_job_config(job: JobConfig) -> None:
    """Validates the provided job configuration.

    Args:
        job: The job to validate.
    """
    if not job.working_dir:
        raise ValueError("Working directory must be provided for local jobs.")
    if not job.run:
        raise ValueError("Run script must be provided for local jobs.")
    if job.resources.cloud != "local":
        raise ValueError(
            f"`Resources.cloud` must be `local`. "
            f"Unsupported cloud: {job.resources.cloud}"
        )
    # Warn that other resource parameters are unused for local jobs.
    if job.resources.region:
        logger.warning("Region is unused for local jobs.")
    if job.resources.zone:
        logger.warning("Zone is unused for local jobs.")
    if job.resources.accelerators:
        logger.warning("Accelerators are unused for local jobs.")
    if job.resources.cpus:
        logger.warning("CPUs are unused for local jobs.")
    if job.resources.memory:
        logger.warning("Memory is unused for local jobs.")
    if job.resources.instance_type:
        logger.warning("Instance type is unused for local jobs.")
    if job.resources.disk_size:
        logger.warning("Disk size is unused for local jobs.")
    if job.resources.instance_type:
        logger.warning("Instance type is unused for local jobs.")
    # Warn that storage mounts are currently unsupported.
    if len(job.storage_mounts.items()) > 0:
        logger.warning("Storage mounts are currently unsupported for local jobs.")
    if len(job.file_mounts.items()) > 0:
        logger.warning("File mounts are currently unsupported for local jobs.")


[docs] class LocalCluster(BaseCluster): """A cluster implementation for running jobs locally.""" def __init__(self, name: str, client: LocalClient) -> None: """Initializes a new instance of the LocalCluster class.""" self._name = name self._client = client
[docs] def __eq__(self, other: Any) -> bool: """Checks if two LocalClusters are equal.""" if not isinstance(other, LocalCluster): return False return self.name() == other.name()
[docs] def name(self) -> str: """Gets the name of the cluster.""" return self._name
[docs] def get_job(self, job_id: str) -> Optional[JobStatus]: """Gets the jobs on this cluster if it exists, else returns None.""" for job in self.get_jobs(): if job.id == job_id: return job return None
[docs] def get_jobs(self) -> list[JobStatus]: """Lists the jobs on this cluster.""" jobs = self._client.list_jobs() for job in jobs: job.cluster = self._name return jobs
[docs] def cancel_job(self, job_id: str) -> JobStatus: """Cancels the specified job on this cluster.""" self._client.cancel(job_id) job = self.get_job(job_id) if job is None: raise RuntimeError(f"Job {job_id} not found.") return job
[docs] def run_job(self, job: JobConfig) -> JobStatus: """Runs the specified job on this cluster. Args: job: The job to run. Returns: The job status. """ job_copy = deepcopy(job) _validate_job_config(job_copy) if not job_copy.name: job_copy.name = uuid.uuid1().hex status = self._client.submit_job(job_copy) status.cluster = self._name return status
[docs] def stop(self) -> None: """Cancels all jobs, running or queued.""" for job in self.get_jobs(): self.cancel_job(job.id)
[docs] def down(self) -> None: """Cancels all jobs, running or queued.""" for job in self.get_jobs(): self.cancel_job(job.id)