Source code for oumi.launcher.clusters.local_cluster
# Copyright 2025 - Oumi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import uuid
from copy import deepcopy
from typing import Any, Optional
from oumi.core.configs import JobConfig
from oumi.core.launcher import BaseCluster, JobStatus
from oumi.launcher.clients.local_client import LocalClient
from oumi.utils.logging import logger
def _validate_job_config(job: JobConfig) -> None:
    """Validates the provided job configuration.
    Args:
        job: The job to validate.
    """
    if not job.working_dir:
        raise ValueError("Working directory must be provided for local jobs.")
    if not job.run:
        raise ValueError("Run script must be provided for local jobs.")
    if job.resources.cloud != "local":
        raise ValueError(
            f"`Resources.cloud` must be `local`. "
            f"Unsupported cloud: {job.resources.cloud}"
        )
    # Warn that other resource parameters are unused for local jobs.
    if job.resources.region:
        logger.warning("Region is unused for local jobs.")
    if job.resources.zone:
        logger.warning("Zone is unused for local jobs.")
    if job.resources.accelerators:
        logger.warning("Accelerators are unused for local jobs.")
    if job.resources.cpus:
        logger.warning("CPUs are unused for local jobs.")
    if job.resources.memory:
        logger.warning("Memory is unused for local jobs.")
    if job.resources.instance_type:
        logger.warning("Instance type is unused for local jobs.")
    if job.resources.disk_size:
        logger.warning("Disk size is unused for local jobs.")
    if job.resources.instance_type:
        logger.warning("Instance type is unused for local jobs.")
    # Warn that storage mounts are currently unsupported.
    if len(job.storage_mounts.items()) > 0:
        logger.warning("Storage mounts are currently unsupported for local jobs.")
    if len(job.file_mounts.items()) > 0:
        logger.warning("File mounts are currently unsupported for local jobs.")
[docs]
class LocalCluster(BaseCluster):
    """A cluster implementation for running jobs locally."""
    def __init__(self, name: str, client: LocalClient) -> None:
        """Initializes a new instance of the LocalCluster class."""
        self._name = name
        self._client = client
[docs]
    def __eq__(self, other: Any) -> bool:
        """Checks if two LocalClusters are equal."""
        if not isinstance(other, LocalCluster):
            return False
        return self.name() == other.name() 
[docs]
    def name(self) -> str:
        """Gets the name of the cluster."""
        return self._name 
[docs]
    def get_job(self, job_id: str) -> Optional[JobStatus]:
        """Gets the jobs on this cluster if it exists, else returns None."""
        for job in self.get_jobs():
            if job.id == job_id:
                return job
        return None 
[docs]
    def get_jobs(self) -> list[JobStatus]:
        """Lists the jobs on this cluster."""
        jobs = self._client.list_jobs()
        for job in jobs:
            job.cluster = self._name
        return jobs 
[docs]
    def cancel_job(self, job_id: str) -> JobStatus:
        """Cancels the specified job on this cluster."""
        self._client.cancel(job_id)
        job = self.get_job(job_id)
        if job is None:
            raise RuntimeError(f"Job {job_id} not found.")
        return job 
[docs]
    def run_job(self, job: JobConfig) -> JobStatus:
        """Runs the specified job on this cluster.
        Args:
            job: The job to run.
        Returns:
            The job status.
        """
        job_copy = deepcopy(job)
        _validate_job_config(job_copy)
        if not job_copy.name:
            job_copy.name = uuid.uuid1().hex
        status = self._client.submit_job(job_copy)
        status.cluster = self._name
        return status 
[docs]
    def stop(self) -> None:
        """Cancels all jobs, running or queued."""
        for job in self.get_jobs():
            self.cancel_job(job.id) 
[docs]
    def down(self) -> None:
        """Cancels all jobs, running or queued."""
        for job in self.get_jobs():
            self.cancel_job(job.id) 
[docs]
    def get_logs_stream(
        self, cluster_name: str, job_id: Optional[str] = None
    ) -> io.TextIOBase:
        """Gets a stream that tails the logs of the target job.
        Args:
            cluster_name: The name of the cluster the job was run in.
            job_id: The ID of the job to tail the logs of.
        """
        raise NotImplementedError