Source code for oumi.inference.remote_inference_engine
# Copyright 2025 - Oumi## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importasyncioimportcopyimportjsonimportosimporttempfileimporturllib.parsefromdataclassesimportdataclassfromdatetimeimportdatetimefromenumimportEnumfrompathlibimportPathfromtypingimportAny,Optionalimportaiofilesimportaiohttpimportjsonlinesimportpydanticfromtqdm.asyncioimporttqdmfromtyping_extensionsimportoverridefromoumi.core.async_utilsimportsafe_asyncio_runfromoumi.core.configsimport(GenerationParams,InferenceConfig,ModelParams,RemoteParams,)fromoumi.core.inferenceimportBaseInferenceEnginefromoumi.core.types.conversationimport(Conversation,Message,Role,)fromoumi.utils.conversation_utilsimport(convert_message_to_json_content_list,create_list_of_message_json_dicts,)_AUTHORIZATION_KEY:str="Authorization"_BATCH_PURPOSE="batch"_BATCH_ENDPOINT="/v1/chat/completions"classBatchStatus(Enum):"""Status of a batch inference job."""VALIDATING="validating"IN_PROGRESS="in_progress"COMPLETED="completed"FAILED="failed"EXPIRED="expired"CANCELLED="cancelled"@dataclassclassBatchInfo:"""Information about a batch job."""id:strstatus:BatchStatustotal_requests:int=0completed_requests:int=0failed_requests:int=0endpoint:Optional[str]=Noneinput_file_id:Optional[str]=Nonebatch_completion_window:Optional[str]=Noneoutput_file_id:Optional[str]=Noneerror_file_id:Optional[str]=Noneerror:Optional[str]=Nonecreated_at:Optional[datetime]=Nonein_progress_at:Optional[datetime]=Noneexpires_at:Optional[datetime]=Nonefinalizing_at:Optional[datetime]=Nonecompleted_at:Optional[datetime]=Nonefailed_at:Optional[datetime]=Noneexpired_at:Optional[datetime]=Nonecanceling_at:Optional[datetime]=Nonecanceled_at:Optional[datetime]=Nonemetadata:Optional[dict[str,Any]]=None@staticmethoddef_convert_timestamp(timestamp:Optional[int])->Optional[datetime]:"""Convert Unix timestamp to datetime. Args: timestamp: Unix timestamp in seconds Returns: datetime: Converted datetime or None if timestamp is None """returndatetime.fromtimestamp(timestamp)iftimestampisnotNoneelseNone@classmethoddeffrom_api_response(cls,response:dict[str,Any])->"BatchInfo":"""Create BatchInfo from API response dictionary. Args: response: Raw API response dictionary Returns: BatchInfo: Parsed batch information """returncls(id=response["id"],status=BatchStatus(response["status"]),endpoint=response.get("endpoint"),input_file_id=response.get("input_file_id"),batch_completion_window=response.get("batch_completion_window"),output_file_id=response.get("output_file_id"),error_file_id=response.get("error_file_id"),error=response.get("error"),created_at=cls._convert_timestamp(response.get("created_at")),in_progress_at=cls._convert_timestamp(response.get("in_progress_at")),expires_at=cls._convert_timestamp(response.get("expires_at")),finalizing_at=cls._convert_timestamp(response.get("finalizing_at")),completed_at=cls._convert_timestamp(response.get("completed_at")),failed_at=cls._convert_timestamp(response.get("failed_at")),expired_at=cls._convert_timestamp(response.get("expired_at")),canceling_at=cls._convert_timestamp(response.get("cancelling_at")),canceled_at=cls._convert_timestamp(response.get("cancelled_at")),total_requests=response.get("request_counts",{}).get("total",0),completed_requests=response.get("request_counts",{}).get("completed",0),failed_requests=response.get("request_counts",{}).get("failed",0),metadata=response.get("metadata"),)@propertydefis_terminal(self)->bool:"""Return True if the batch is in a terminal state."""returnself.statusin(BatchStatus.COMPLETED,BatchStatus.FAILED,BatchStatus.EXPIRED,BatchStatus.CANCELLED,)@propertydefcompletion_percentage(self)->float:"""Return the percentage of completed requests."""return((100*self.completed_requests/self.total_requests)ifself.total_requests>0else0.0)@propertydefhas_errors(self)->bool:"""Return True if the batch has any errors."""returnbool(self.error)orself.failed_requests>0@dataclassclassBatchListResponse:"""Response from listing batch jobs."""batches:list[BatchInfo]first_id:Optional[str]=Nonelast_id:Optional[str]=Nonehas_more:bool=False@dataclassclassFileInfo:"""Information about a file."""id:strfilename:strbytes:intcreated_at:intpurpose:str@dataclassclassFileListResponse:"""Response from listing files."""files:list[FileInfo]has_more:bool=False
[docs]classRemoteInferenceEngine(BaseInferenceEngine):"""Engine for running inference against a server implementing the OpenAI API."""base_url:Optional[str]=None"""The base URL for the remote API."""api_key_env_varname:Optional[str]=None"""The environment variable name for the API key."""_remote_params:RemoteParams"""Parameters for running inference against a remote API."""def__init__(self,model_params:ModelParams,*,generation_params:Optional[GenerationParams]=None,remote_params:Optional[RemoteParams]=None,):"""Initializes the inference Engine. Args: model_params: The model parameters to use for inference. generation_params: Generation parameters to use for inference. remote_params: Remote server params. **kwargs: Additional keyword arguments. """super().__init__(model_params=model_params,generation_params=generation_params)ifremote_params:remote_params=copy.deepcopy(remote_params)else:remote_params=self._default_remote_params()ifnotremote_params.api_url:remote_params.api_url=self.base_urlifnotremote_params.api_key_env_varname:remote_params.api_key_env_varname=self.api_key_env_varnameself._remote_params=remote_paramsself._remote_params.finalize_and_validate()def_default_remote_params(self)->RemoteParams:"""Returns the default remote parameters."""returnRemoteParams()@staticmethoddef_get_list_of_message_json_dicts(messages:list[Message],*,group_adjacent_same_role_turns:bool,)->list[dict[str,Any]]:returncreate_list_of_message_json_dicts(messages,group_adjacent_same_role_turns=group_adjacent_same_role_turns)def_convert_conversation_to_api_input(self,conversation:Conversation,generation_params:GenerationParams,model_params:ModelParams,)->dict[str,Any]:"""Converts a conversation to an OpenAI input. Documentation: https://platform.openai.com/docs/api-reference/chat/create Args: conversation: The conversation to convert. generation_params: Parameters for generation during inference. model_params: Model parameters to use during inference. Returns: Dict[str, Any]: A dictionary representing the OpenAI input. """# Mandatory generation parameters.generation_params_dict={"max_completion_tokens":generation_params.max_new_tokens,"seed":generation_params.seed,"temperature":generation_params.temperature,"top_p":generation_params.top_p,"frequency_penalty":generation_params.frequency_penalty,"presence_penalty":generation_params.presence_penalty,}# Optional generation parameters.ifgeneration_params.logit_bias:generation_params_dict["logit_bias"]=generation_params.logit_biasifgeneration_params.stop_strings:generation_params_dict["stop"]=generation_params.stop_stringsifgeneration_params.stop_token_ids:generation_params_dict["stop_token_ids"]=generation_params.stop_token_idsifgeneration_params.min_p:generation_params_dict["min_p"]=generation_params.min_papi_input={"model":model_params.model_name,"messages":[{"content":convert_message_to_json_content_list(message),"role":message.role.value,}formessageinconversation.messages],"n":1,# Number of completions to generate for each prompt.**generation_params_dict,}ifgeneration_params.guided_decoding:json_schema=generation_params.guided_decoding.jsonifjson_schemaisNone:raiseValueError("Only JSON schema guided decoding is supported, got '%s'",generation_params.guided_decoding,)ifisinstance(json_schema,type)andissubclass(json_schema,pydantic.BaseModel):schema_name=json_schema.__name__schema_value=json_schema.model_json_schema()elifisinstance(json_schema,dict):# Use a generic name if no schema is provided.schema_name="Response"schema_value=json_schemaelifisinstance(json_schema,str):# Use a generic name if no schema is provided.schema_name="Response"# Try to parse as JSON stringschema_value=json.loads(json_schema)else:raiseValueError(f"Got unsupported JSON schema type: {type(json_schema)}""Please provide a Pydantic model or a JSON schema as a ""string or dict.")api_input["response_format"]={"type":"json_schema","json_schema":{"name":schema_name,"schema":schema_value,},}returnapi_inputdef_convert_api_output_to_conversation(self,response:dict[str,Any],original_conversation:Conversation)->Conversation:"""Converts an API response to a conversation. Args: response: The API response to convert. original_conversation: The original conversation. Returns: Conversation: The conversation including the generated response. """if"error"inresponse:raiseRuntimeError(f"API error: {response['error'].get('message',response['error'])}")if"choices"notinresponseornotresponse["choices"]:raiseRuntimeError(f"No choices found in API response: {response}")message=response["choices"][0].get("message")ifnotmessage:raiseRuntimeError(f"No message found in API response: {response}")returnConversation(messages=[*original_conversation.messages,Message(content=message["content"],role=Role(message["role"]),),],metadata=original_conversation.metadata,conversation_id=original_conversation.conversation_id,)def_get_api_key(self,remote_params:RemoteParams)->Optional[str]:ifnotremote_params:returnNoneifremote_params.api_key:returnremote_params.api_keyifremote_params.api_key_env_varname:returnos.environ.get(remote_params.api_key_env_varname)returnNonedef_get_request_headers(self,remote_params:Optional[RemoteParams])->dict[str,str]:headers={}ifnotremote_params:returnheadersapi_key=self._get_api_key(remote_params)ifapi_key:headers[_AUTHORIZATION_KEY]=f"Bearer {api_key}"returnheadersdef_set_required_fields_for_inference(self,remote_params:RemoteParams):"""Set required fields for inference."""ifnotremote_params.api_url:remote_params.api_url=self._remote_params.api_urlorself.base_urlifnotremote_params.api_key_env_varname:remote_params.api_key_env_varname=(self._remote_params.api_key_env_varnameorself.api_key_env_varname)ifnotremote_params.api_key:remote_params.api_key=self._remote_params.api_keyasyncdef_query_api(self,conversation:Conversation,semaphore:asyncio.Semaphore,session:aiohttp.ClientSession,inference_config:Optional[InferenceConfig]=None,)->Conversation:"""Queries the API with the provided input. Args: conversation: The conversations to run inference on. semaphore: Semaphore to limit concurrent requests. session: The aiohttp session to use for the request. inference_config: Parameters for inference. Returns: Conversation: Inference output. """ifinference_configisNone:remote_params=self._remote_paramsgeneration_params=self._generation_paramsmodel_params=self._model_paramsoutput_path=Noneelse:remote_params=inference_config.remote_paramsorself._remote_paramsgeneration_params=inference_config.generationorself._generation_paramsmodel_params=inference_config.modelorself._model_paramsoutput_path=inference_config.output_pathself._set_required_fields_for_inference(remote_params)ifnotremote_params.api_url:raiseValueError("API URL is required for remote inference.")ifnotself._get_api_key(remote_params):ifremote_params.api_key_env_varname:raiseValueError("An API key is required for remote inference with the "f"`{self.__class__.__name__}` inference engine. ""Please set the environment variable "f"`{remote_params.api_key_env_varname}`.")asyncwithsemaphore:api_input=self._convert_conversation_to_api_input(conversation,generation_params,model_params)headers=self._get_request_headers(remote_params)failure_reason=None# Retry the request if it failsforattemptinrange(remote_params.max_retries+1):try:# Calculate exponential backoff delayifattempt>0:delay=min(remote_params.retry_backoff_base*(2**(attempt-1)),remote_params.retry_backoff_max,)awaitasyncio.sleep(delay)asyncwithsession.post(remote_params.api_url,json=api_input,headers=headers,timeout=remote_params.connection_timeout,)asresponse:try:response_json=awaitresponse.json()exceptaiohttp.ContentTypeError:try:text_response=awaitresponse.text()response_json=json.loads(text_response)except(json.JSONDecodeError,ValueError)ase:ifattempt==remote_params.max_retries:raiseRuntimeError("Failed to parse response as JSON after "f"{attempt+1} attempts. ""Response content type:"f"{response.content_type}. "f"Error: {str(e)}")fromecontinueifresponse.status==200:result=self._convert_api_output_to_conversation(response_json,conversation)ifoutput_path:# Write what we have so far to our scratch directoryself._save_conversation(result,self._get_scratch_filepath(output_path),)returnresultelse:ifisinstance(response_json,list):# If the response is a list, it is likely an error# messageresponse_json=response_json[0]failure_reason=(response_json.get("error",{}).get("message")ifresponse_jsonelsef"HTTP {response.status}")ifattempt<remote_params.max_retries:continueexcept(aiohttp.ClientError,asyncio.TimeoutError)ase:ifattempt==remote_params.max_retries:raiseRuntimeError(f"Failed to query API after {attempt} retries due to "f"connection error: {str(e)}")fromecontinuefinally:awaitasyncio.sleep(remote_params.politeness_policy)raiseRuntimeError(f"Failed to query API after {remote_params.max_retries} retries. "+(f"Reason: {failure_reason}"iffailure_reasonelse""))asyncdef_infer(self,input:list[Conversation],inference_config:Optional[InferenceConfig]=None,)->list[Conversation]:"""Runs model inference on the provided input. Args: input: A list of conversations to run inference on. inference_config: Parameters for inference. remote_params: Parameters for running inference against a remote API. Returns: List[Conversation]: Inference output. """# Limit number of HTTP connections to the number of workers.connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)# Control the number of concurrent tasks via a semaphore.semaphore=asyncio.BoundedSemaphore(self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:tasks=[self._query_api(conversation,semaphore,session,inference_config=inference_config,)forconversationininput]disable_tqdm=len(tasks)<2returnawaittqdm.gather(*tasks,disable=disable_tqdm)
[docs]@overridedefinfer_online(self,input:list[Conversation],inference_config:Optional[InferenceConfig]=None,)->list[Conversation]:"""Runs model inference online. Args: input: A list of conversations to run inference on. inference_config: Parameters for inference. Returns: List[Conversation]: Inference output. """conversations=safe_asyncio_run(self._infer(input,inference_config))ifinference_configandinference_config.output_path:self._save_conversations(conversations,inference_config.output_path)returnconversations
[docs]@overridedefinfer_from_file(self,input_filepath:str,inference_config:Optional[InferenceConfig]=None)->list[Conversation]:"""Runs model inference on inputs in the provided file. This is a convenience method to prevent boilerplate from asserting the existence of input_filepath in the generation_params. Args: input_filepath: Path to the input file containing prompts for generation. inference_config: Parameters for inference. Returns: List[Conversation]: Inference output. """input=self._read_conversations(input_filepath)conversations=safe_asyncio_run(self._infer(input,inference_config))ifinference_configandinference_config.output_path:self._save_conversations(conversations,inference_config.output_path)returnconversations
[docs]@overridedefget_supported_params(self)->set[str]:"""Returns a set of supported generation parameters for this engine."""return{"frequency_penalty","guided_decoding","logit_bias","max_new_tokens","min_p","presence_penalty","seed","stop_strings","stop_token_ids","temperature","top_p",}
## Batch inference#
[docs]defget_file_api_url(self)->str:"""Returns the URL for the file API."""returnstr(urllib.parse.urlparse(self._remote_params.api_url)._replace(path="/v1/files").geturl())
[docs]defget_batch_api_url(self)->str:"""Returns the URL for the batch API."""returnstr(urllib.parse.urlparse(self._remote_params.api_url)._replace(path="/v1/batches").geturl())
[docs]definfer_batch(self,conversations:list[Conversation],inference_config:Optional[InferenceConfig]=None,)->str:"""Creates a new batch inference job. Args: conversations: List of conversations to process in batch inference_config: Parameters for inference Returns: str: The batch job ID """ifinference_config:generation_params=inference_config.generationorself._generation_paramsmodel_params=inference_config.modelorself._model_paramselse:generation_params=self._generation_paramsmodel_params=self._model_paramsreturnsafe_asyncio_run(self._create_batch(conversations,generation_params,model_params))
[docs]defget_batch_status(self,batch_id:str,)->BatchInfo:"""Gets the status of a batch inference job. Args: batch_id: The batch job ID Returns: BatchInfo: Current status of the batch job """returnsafe_asyncio_run(self._get_batch_status(batch_id))
[docs]deflist_batches(self,after:Optional[str]=None,limit:Optional[int]=None,)->BatchListResponse:"""Lists batch jobs. Args: after: Cursor for pagination (batch ID to start after) limit: Maximum number of batches to return (1-100) Returns: BatchListResponse: List of batch jobs """returnsafe_asyncio_run(self._list_batches(after=after,limit=limit,))
[docs]defget_batch_results(self,batch_id:str,conversations:list[Conversation],)->list[Conversation]:"""Gets the results of a completed batch job. Args: batch_id: The batch job ID conversations: Original conversations used to create the batch Returns: List[Conversation]: The processed conversations with responses Raises: RuntimeError: If the batch failed or has not completed """returnsafe_asyncio_run(self._get_batch_results_with_mapping(batch_id,conversations))
asyncdef_upload_batch_file(self,batch_requests:list[dict],)->str:"""Uploads a JSONL file containing batch requests. Args: batch_requests: List of request objects to include in the batch Returns: str: The uploaded file ID """# Create temporary JSONL filewithtempfile.NamedTemporaryFile(mode="w",suffix=".jsonl",delete=False)astmp:withjsonlines.Writer(tmp)aswriter:forrequestinbatch_requests:writer.write(request)tmp_path=tmp.nametry:# Upload the fileconnector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)# Create form data with fileform=aiohttp.FormData()asyncwithaiofiles.open(tmp_path,"rb")asf:file_data=awaitf.read()form.add_field("file",file_data,filename="batch_requests.jsonl")form.add_field("purpose",_BATCH_PURPOSE)asyncwithsession.post(self.get_file_api_url(),data=form,headers=headers,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to upload batch file: {awaitresponse.text()}")data=awaitresponse.json()returndata["id"]finally:# Clean up temporary filePath(tmp_path).unlink()asyncdef_create_batch(self,conversations:list[Conversation],generation_params:GenerationParams,model_params:ModelParams,)->str:"""Creates a new batch job. Args: conversations: List of conversations to process in batch generation_params: Generation parameters model_params: Model parameters Returns: str: The batch job ID """# Prepare batch requestsbatch_requests=[]fori,convinenumerate(conversations):api_input=self._convert_conversation_to_api_input(conv,generation_params,model_params)batch_requests.append({"custom_id":f"request-{i}","method":"POST","url":_BATCH_ENDPOINT,"body":api_input,})# Upload batch filefile_id=awaitself._upload_batch_file(batch_requests)# Create batchconnector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)asyncwithsession.post(self.get_batch_api_url(),json={"input_file_id":file_id,"endpoint":_BATCH_ENDPOINT,"completion_window":(self._remote_params.batch_completion_window),},headers=headers,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to create batch: {awaitresponse.text()}")data=awaitresponse.json()returndata["id"]asyncdef_get_batch_status(self,batch_id:str,)->BatchInfo:"""Gets the status of a batch job. Args: batch_id: ID of the batch job Returns: BatchInfo: Current status of the batch job """connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)asyncwithsession.get(f"{self.get_batch_api_url()}/{batch_id}",headers=headers,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to get batch status: {awaitresponse.text()}")data=awaitresponse.json()returnBatchInfo.from_api_response(data)asyncdef_list_batches(self,after:Optional[str]=None,limit:Optional[int]=None,)->BatchListResponse:"""Lists batch jobs. Args: after: Cursor for pagination (batch ID to start after) limit: Maximum number of batches to return (1-100) Returns: BatchListResponse: List of batch jobs """connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)params={}ifafter:params["after"]=afteriflimit:params["limit"]=str(limit)asyncwithsession.get(self.get_batch_api_url(),headers=headers,params=params,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to list batches: {awaitresponse.text()}")data=awaitresponse.json()batches=[BatchInfo.from_api_response(batch_data)forbatch_dataindata["data"]]returnBatchListResponse(batches=batches,first_id=data.get("first_id"),last_id=data.get("last_id"),has_more=data.get("has_more",False),)asyncdef_get_batch_results_with_mapping(self,batch_id:str,conversations:list[Conversation],)->list[Conversation]:"""Gets the results of a completed batch job and maps them to conversations. Args: batch_id: ID of the batch job conversations: Original conversations used to create the batch Returns: List[Conversation]: The processed conversations with responses Raises: RuntimeError: If batch status is not completed or if there are errors """# Get batch status firstbatch_info=awaitself._get_batch_status(batch_id)ifnotbatch_info.is_terminal:raiseRuntimeError(f"Batch is not in terminal state. Status: {batch_info.status}")ifbatch_info.has_errors:# Download error file if there are failed requestsifbatch_info.error_file_id:error_content=awaitself._download_file(batch_info.error_file_id)raiseRuntimeError(f"Batch has failed requests: {error_content}")raiseRuntimeError(f"Batch failed with error: {batch_info.error}")# Download results fileifnotbatch_info.output_file_id:raiseRuntimeError("No output file available")results_content=awaitself._download_file(batch_info.output_file_id)# Parse resultsprocessed_conversations=[]forline,convinzip(results_content.splitlines(),conversations):result=json.loads(line)ifresult.get("error"):raiseRuntimeError(f"Batch request failed: {result['error']}")processed_conv=self._convert_api_output_to_conversation(result["response"]["body"],conv)processed_conversations.append(processed_conv)returnprocessed_conversations## File operations#
[docs]defget_file(self,file_id:str,)->FileInfo:"""Gets information about a file."""returnsafe_asyncio_run(self._get_file(file_id))
[docs]defdelete_file(self,file_id:str,)->bool:"""Deletes a file."""returnsafe_asyncio_run(self._delete_file(file_id))
[docs]defget_file_content(self,file_id:str,)->str:"""Gets a file's content."""returnsafe_asyncio_run(self._download_file(file_id))
asyncdef_list_files(self,purpose:Optional[str]=None,limit:Optional[int]=None,order:str="desc",after:Optional[str]=None,)->FileListResponse:"""Lists files. Args: purpose: Only return files with this purpose limit: Maximum number of files to return (1-10000) order: Sort order (asc or desc) after: Cursor for pagination Returns: FileListResponse: List of files """connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)params={"order":order}ifpurpose:params["purpose"]=purposeiflimit:params["limit"]=str(limit)ifafter:params["after"]=afterasyncwithsession.get(self.get_file_api_url(),headers=headers,params=params,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to list files: {awaitresponse.text()}")data=awaitresponse.json()files=[FileInfo(id=file["id"],filename=file["filename"],bytes=file["bytes"],created_at=file["created_at"],purpose=file["purpose"],)forfileindata["data"]]returnFileListResponse(files=files,has_more=len(files)==limitiflimitelseFalse)asyncdef_get_file(self,file_id:str,)->FileInfo:"""Gets information about a file. Args: file_id: ID of the file remote_params: Remote API parameters Returns: FileInfo: File information """connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)asyncwithsession.get(f"{self.get_file_api_url()}/{file_id}",headers=headers,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to get file: {awaitresponse.text()}")data=awaitresponse.json()returnFileInfo(id=data["id"],filename=data["filename"],bytes=data["bytes"],created_at=data["created_at"],purpose=data["purpose"],)asyncdef_delete_file(self,file_id:str,)->bool:"""Deletes a file. Args: file_id: ID of the file to delete remote_params: Remote API parameters Returns: bool: True if deletion was successful """connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)asyncwithsession.delete(f"{self.get_file_api_url()}/{file_id}",headers=headers,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to delete file: {awaitresponse.text()}")data=awaitresponse.json()returndata.get("deleted",False)asyncdef_download_file(self,file_id:str,)->str:"""Downloads a file's content. Args: file_id: ID of the file to download remote_params: Remote API parameters Returns: str: The file content """connector=aiohttp.TCPConnector(limit=self._remote_params.num_workers)asyncwithaiohttp.ClientSession(connector=connector)assession:headers=self._get_request_headers(self._remote_params)asyncwithsession.get(f"{self.get_file_api_url()}/{file_id}/content",headers=headers,)asresponse:ifresponse.status!=200:raiseRuntimeError(f"Failed to download file: {awaitresponse.text()}")returnawaitresponse.text()