Source code for oumi.launcher.clients.local_client
# Copyright 2025 - Oumi## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importosimporttimefromdataclassesimportdataclassfromdatetimeimportdatetimefromenumimportEnumfrompathlibimportPathfromsubprocessimportPIPE,PopenfromthreadingimportLock,ThreadfromtypingimportOptionalfromoumi.core.configsimportJobConfigfromoumi.core.launcherimportJobStatus@dataclassclass_LocalJob:"""A class representing a job running locally."""status:JobStatusconfig:JobConfigstdout:Optional[str]=Nonestderr:Optional[str]=Noneclass_JobState(Enum):"""An enumeration of the possible states of a job."""QUEUED="QUEUED"RUNNING="RUNNING"COMPLETED="COMPLETED"FAILED="FAILED"CANCELED="CANCELED"
[docs]classLocalClient:"""A client for running jobs locally in a subprocess."""# The maximum number of characters to read from the subprocess's stdout and stderr._MAX_BUFFER_SIZE=1024# The environment variable used to specify the logging directory._OUMI_LOGGING_DIR="OUMI_LOGGING_DIR"def__init__(self):"""Initializes a new instance of the LocalClient class."""self._mutex=Lock()self._next_job_id=0# A mapping of job IDs to their respective job configurations.self._jobs={}self._running_process=Noneself._worker=Thread(target=self._worker_loop,daemon=True)self._worker.start()def_update_job_status(self,job_id:str,status:_JobState)->None:"""Updates the status of the job. Assumes the mutex is already acquired."""ifjob_idnotinself._jobs:returnself._jobs[job_id].status.status=status.valueis_done=statusin(_JobState.COMPLETED,_JobState.FAILED,_JobState.CANCELED)self._jobs[job_id].status.done=is_donedef_worker_run_job(self)->Optional[_LocalJob]:"""Kicks off and returns a new job. Assumes the mutex is already acquired."""job=self._get_next_job()ifjobisNone:returnNoneenv_copy=os.environ.copy()env_copy.update(job.config.envs)# Check if the user has specified a logging directory.ifself._OUMI_LOGGING_DIRinenv_copy:logging_dir=Path(env_copy[self._OUMI_LOGGING_DIR])logging_dir.mkdir(parents=True,exist_ok=True)dt=datetime.now()log_format=f"{dt:%Y_%m_%d_%H_%M_%S}_{dt.microsecond//1000:03d}"job.stderr=str(logging_dir/f"{log_format}_{job.status.id}.stderr")job.stdout=str(logging_dir/f"{log_format}_{job.status.id}.stdout")# Always change to the working directory before running the job.working_dir_cmd=f"cd {job.config.working_dir}"setup_cmds=job.config.setupor""cmds="\n".join([working_dir_cmd,setup_cmds,job.config.run])# Start the job but don't block.stderr_logs=open(job.stderr,"w")ifjob.stderrelsePIPEstdout_logs=open(job.stdout,"w")ifjob.stdoutelsePIPEself._running_process=Popen(cmds,shell=True,env=env_copy,stdout=stdout_logs,stderr=stderr_logs,)self._update_job_status(job.status.id,_JobState.RUNNING)returnjobdef_worker_handle_running_job(self,job:_LocalJob)->None:"""Polls and handles the specified job. Acquires the mutex."""# Return immediately if no job is running.ifself._running_processisNone:return# Wait for the job to finish. No need to grab the mutex here.ifself._running_process.wait()==0:# Job was successful.finish_time=datetime.fromtimestamp(time.time()).isoformat()withself._mutex:self._jobs[job.status.id].status.metadata=f"Job finished at {finish_time} ."self._update_job_status(job.status.id,_JobState.COMPLETED)ifjob.stdoutisnotNone:self._jobs[job.status.id].status.metadata+=f" Logs available at: {job.stdout}"else:# Job failed.withself._mutex:self._update_job_status(job.status.id,_JobState.FAILED)ifjob.stderrisnotNone:self._jobs[job.status.id].status.metadata=f"Error logs available at: {job.stderr}"else:error_metadata=""ifself._running_process.stderrisnotNone:forlineinself._running_process.stderr:error_metadata+=str(line)# Only keep the last _MAX_BUFFER_SIZE characters.error_metadata=error_metadata[-self._MAX_BUFFER_SIZE:]self._jobs[job.status.id].status.metadata=error_metadatadef_worker_loop(self):"""The main worker loop that runs jobs."""whileTrue:withself._mutex:# Run the next job if it exists.job=self._worker_run_job()# No job to run, sleep for a bit.ifjobisNone:time.sleep(5)continue# Wait for the job to finish.self._worker_handle_running_job(job)# Clear the running process.withself._mutex:self._running_process=Nonedef_generate_next_job_id(self)->str:"""Gets the next job ID."""job_id=self._next_job_idself._next_job_id+=1returnstr(job_id)def_get_next_job(self)->Optional[_LocalJob]:"""Gets the next QUEUED job from the queue."""queued_jobs=[jobforjobinself._jobs.values()ifjob.status.status==_JobState.QUEUED.value]iflen(queued_jobs)==0:returnNonenext_job_id=queued_jobs[0].status.idforjobinqueued_jobs:ifint(job.status.id)<int(next_job_id):next_job_id=job.status.idreturnself._jobs[next_job_id]
[docs]defsubmit_job(self,job:JobConfig)->JobStatus:"""Runs the specified job on this cluster."""withself._mutex:job_id=self._generate_next_job_id()name=job.nameifjob.nameelsejob_idstatus=JobStatus(name=name,id=job_id,status=_JobState.QUEUED.value,cluster="",metadata="",done=False,)self._jobs[job_id]=_LocalJob(status=status,config=job)returnstatus
[docs]deflist_jobs(self)->list[JobStatus]:"""Returns a list of job statuses."""withself._mutex:return[job.statusforjobinself._jobs.values()]
[docs]defget_job(self,job_id:str)->Optional[JobStatus]:"""Gets the specified job's status. Args: job_id: The ID of the job to get. Returns: The job status if found, None otherwise. """job_list=self.list_jobs()forjobinjob_list:ifjob.id==job_id:returnjobreturnNone
[docs]defcancel(self,job_id)->Optional[JobStatus]:"""Cancels the specified job. Args: job_id: The ID of the job to cancel. queue: The name of the queue to search. Returns: The job status if found, None otherwise. """withself._mutex:ifjob_idnotinself._jobs:returnNonejob=self._jobs[job_id]ifjob.status.status==_JobState.RUNNING.value:ifself._running_processisnotNone:self._running_process.terminate()self._update_job_status(job_id,_JobState.CANCELED)elifjob.status.status==_JobState.QUEUED.value:self._update_job_status(job_id,_JobState.CANCELED)returnjob.status