Source code for oumi.launcher.clusters.local_cluster
# Copyright 2025 - Oumi## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importuuidfromcopyimportdeepcopyfromtypingimportAny,Optionalfromoumi.core.configsimportJobConfigfromoumi.core.launcherimportBaseCluster,JobStatusfromoumi.launcher.clients.local_clientimportLocalClientfromoumi.utils.loggingimportloggerdef_validate_job_config(job:JobConfig)->None:"""Validates the provided job configuration. Args: job: The job to validate. """ifnotjob.working_dir:raiseValueError("Working directory must be provided for local jobs.")ifnotjob.run:raiseValueError("Run script must be provided for local jobs.")ifjob.resources.cloud!="local":raiseValueError(f"`Resources.cloud` must be `local`. "f"Unsupported cloud: {job.resources.cloud}")# Warn that other resource parameters are unused for local jobs.ifjob.resources.region:logger.warning("Region is unused for local jobs.")ifjob.resources.zone:logger.warning("Zone is unused for local jobs.")ifjob.resources.accelerators:logger.warning("Accelerators are unused for local jobs.")ifjob.resources.cpus:logger.warning("CPUs are unused for local jobs.")ifjob.resources.memory:logger.warning("Memory is unused for local jobs.")ifjob.resources.instance_type:logger.warning("Instance type is unused for local jobs.")ifjob.resources.disk_size:logger.warning("Disk size is unused for local jobs.")ifjob.resources.instance_type:logger.warning("Instance type is unused for local jobs.")# Warn that storage mounts are currently unsupported.iflen(job.storage_mounts.items())>0:logger.warning("Storage mounts are currently unsupported for local jobs.")iflen(job.file_mounts.items())>0:logger.warning("File mounts are currently unsupported for local jobs.")
[docs]classLocalCluster(BaseCluster):"""A cluster implementation for running jobs locally."""def__init__(self,name:str,client:LocalClient)->None:"""Initializes a new instance of the LocalCluster class."""self._name=nameself._client=client
[docs]def__eq__(self,other:Any)->bool:"""Checks if two LocalClusters are equal."""ifnotisinstance(other,LocalCluster):returnFalsereturnself.name()==other.name()
[docs]defname(self)->str:"""Gets the name of the cluster."""returnself._name
[docs]defget_job(self,job_id:str)->Optional[JobStatus]:"""Gets the jobs on this cluster if it exists, else returns None."""forjobinself.get_jobs():ifjob.id==job_id:returnjobreturnNone
[docs]defget_jobs(self)->list[JobStatus]:"""Lists the jobs on this cluster."""jobs=self._client.list_jobs()forjobinjobs:job.cluster=self._namereturnjobs
[docs]defcancel_job(self,job_id:str)->JobStatus:"""Cancels the specified job on this cluster."""self._client.cancel(job_id)job=self.get_job(job_id)ifjobisNone:raiseRuntimeError(f"Job {job_id} not found.")returnjob
[docs]defrun_job(self,job:JobConfig)->JobStatus:"""Runs the specified job on this cluster. Args: job: The job to run. Returns: The job status. """job_copy=deepcopy(job)_validate_job_config(job_copy)ifnotjob_copy.name:job_copy.name=uuid.uuid1().hexstatus=self._client.submit_job(job_copy)status.cluster=self._namereturnstatus
[docs]defstop(self)->None:"""Cancels all jobs, running or queued."""forjobinself.get_jobs():self.cancel_job(job.id)
[docs]defdown(self)->None:"""Cancels all jobs, running or queued."""forjobinself.get_jobs():self.cancel_job(job.id)