diff --git a/README.md b/README.md index 085838f..f2ffa69 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ export OUTPOST_API_TOKEN= import outpostkit print(outpostkit.__version__) ``` -`0.0.39` +`0.0.69` ## Create a client @@ -60,29 +60,9 @@ client = Client(api_token=os.environ.get('OUTPOST_API_TOKEN')) endpoints_client = Endpoints(client=client, entity=client.user.name) template = EndpointAutogeneratedTemplateConfig(modelSource="huggingface",huggingfaceModel=EndpointAutogeneratedHFModelDetails(id="Falconsai/text_summarization")) -endpoint = endpoints_client.create(templateConfig=template) -``` -To import gated/private models from Hugging Face, you can add your Hugging Face key as a third party token and provide its `id` in the config. -```py -EndpointAutogeneratedHFModelDetails(id="Falconsai/text_summarization",keyId="") -``` - -If you have a specific revision of the model that you want to deploy, provide the revision in the `revision` field. - -```py -template = EndpointAutogeneratedTemplateConfig(modelSource="outpost", revision="df5ef1a0e2d2579726d74b5d617b17c7049c5a89",outpostModel=EndpointAutogeneratedOutpostModelDetails(id="aj-ya/text-gen")) -``` -The horizontal scaling configurations of the endpoint are based on the number of requests to the prediction request path. -You can tweak the settings at creation too. -```py -from outpostkit._types.endpoint import ReplicaScalingConfig -scaling_config = ReplicaScalingConfig(min=0,max=1,scaledownPeriod=900,targetPendingRequests=100) # Defaults -endpoint = endpoints_client.create(templateConfig=template,replica_scaling_config = scaling_config) +endpoint = endpoints_client.create(template=template) ``` -`scaledownPeriod`: The period to wait after the last reported active before scaling the resource back to 0. -`targetPendingRequests`: This is the number of pending (or in-progress) requests that your application needs to have before it is scaled up. Conversely, if your application has below this number of pending requests, it will scaled down. - ## Deploy the endpoint once you create the endpoint, you need to deploy it. ```py @@ -119,3 +99,175 @@ print(resp.json()) ## Development See [CONTRIBUTING.md](CONTRIBUTING.md) + + +## Endpoint Creation Payload +``` + def create( + self, + template: Union[ + EndpointAutogeneratedTemplateConfig, EndpointCustomTemplateConfig + ], + container: Optional[EndpointPrebuiltContainerDetails] = None, + hardware_instance: str = "e2-standard-2", + task_type: Optional[str] = None, + name: Optional[str] = None, + secrets: Optional[List[EndpointSecret]] = None, + visibility: ServiceVisibility = ServiceVisibility.public, + replica_scaling_config: Optional[ReplicaScalingConfig] = None, + ) -> Endpoint: ... +``` +An endpoint server needs to know certain things like model loading, prediction request handling, exception handling, etc. and these things vary with each usecase. +Thus the server follows a template which tells it how to behave. + +### Template configuration +You can either let us autogenerate templates based on the model information, or create a custom template yourself. + +#### Configs to Autogenerate Template +Currently we can autogenerate templates models stored at 'Outpost' or 'Hugging Face'. + +Task types supported +--TODO: List all task types supported-- + +To Import a model stored at Outpost, you can directly use: + +```py +from outpostkit._types.endpoints import EndpointAutogeneratedTemplateConfig, EndpointAutogeneratedOutpostModelDetails + +template = EndpointAutogeneratedTemplateConfig(modelSource="outpost",outpostModel=EndpointAutogeneratedOutpostModelDetails(id="aj-ya/text-gen")) +``` + +If you have a specific revision of the model that you want to deploy, provide the revision in the `revision` field. + +```py +template = EndpointAutogeneratedTemplateConfig(modelSource="outpost", revision="df5ef1a0e2d2579726d74b5d617b17c7049c5a89",outpostModel=EndpointAutogeneratedOutpostModelDetails(id="aj-ya/text-gen")) +``` + +To import gated/private models from Hugging Face, you can add your Hugging Face key as a third party token and provide its `id` in the config. +```py +from outpostkit._types.endpoints import EndpointAutogeneratedTemplateConfig, EndpointAutogeneratedHFModelDetails +hf_model = EndpointAutogeneratedHFModelDetails(id="Falconsai/text_summarization",keyId="") +template = EndpointAutogeneratedTemplateConfig(modelSource="huggingface",huggingfaceModel=hf_model) +``` + +#### Create a custom template +A Template class needs to mainly define model initialization and prediction request handling. +For demonstration purposes, lets create a template file for the `openai/shap-e` model. + +First of all we need to create a class and the load the model at the initialization phase. +Then, we need to define the request handler for the `/predict` route. this is done by defining +the `predict` member function. +This function itself acts as the handler, thus you can define any parameters that FastAPI supports to the function. Here we will use a pydantic class to validate the request json body and get generation arguements. (ref: https://fastapi.tiangolo.com/tutorial/body/) +Finally, we would like to also stream the output GIF, for this we will use the `StreamingResponse` object by fastapi. + +```py +from io import BytesIO +from typing import List +from diffusers import ShapEPipeline +from fastapi.responses import StreamingResponse +from pydantic import BaseModel +from PIL.Image import Image + + +class Item(BaseModel): + prompt: str + frame_size: int = 256 + num_inference_steps: int = 64 + guidance_scale: float = 15 + + +def pil_gif_resp(image: List[Image]) -> StreamingResponse: + temp = BytesIO() + image[0].save( + temp, + save_all=True, + append_images=image[1:], + optimize=False, + duration=100, + loop=0, + ) + return StreamingResponse(temp, media_type="image/gif") + + +class ShapEHandler: + pipeline: ShapEPipeline + + def __init__(self) -> None: + ckpt_id = "openai/shap-e" + self.pipeline = ShapEPipeline.from_pretrained(ckpt_id).to("cuda") + + def predict(self, item: Item): + images = self.pipeline( + item.prompt, + guidance_scale=item.guidance_scale, + num_inference_steps=item.num_inference_steps, + frame_size=item.frame_size, + ).images + return pil_gif_resp(images[0]) +``` + +##### Installing extra packages and modules +If your application needs a specific python package or system dependency that is not already installed in the container image (--TODO-- docs showing the list of prebuilt container images and the packages installed there.). +Then, you can define these members in the Template class + +```py +class Template: + # extra system dependencies required + system_dependencies: List[str] = ['curl'] + + # extra python packages required + python_requirements: List[str] = ['gif==23.0'] + ... +``` + +##### Exception Handling + +To define exception handling outside the prediction handler, you can extend the default expection handling done by the server like this: +```py +from fastapi.responses import JSONResponse + +async def generic_exception_handler(_, exc: Exception): + return JSONResponse( + json.dumps({"error": str(exc), "type": "unhandled_error"}), + status_code=500, + ) + +class Template: + # define custom exception handlers for the fastapi app + exception_handlers: Dict[Union[int, Type[Exception]], Callable] = dict({ + Exception: generic_exception_handler + }) + ... +``` + + +### Container configuration + +if youre already using a prebuilt template, most of the times, you wont need to define this. It is already selected based on the library and task type. + +But you can manually configure this as well. +Currently, you can only use any one of many prebuilt containers that are provided by outpost. + + +Namely, +--TODO-- list all container images here. + +To use the tensorflow image with pytorch loaded, with some extra configs, use: +``` +from outpostkit import EndpointPrebuiltContainerDetails +container = EndpointPrebuiltContainerDetails(name="transformers-pt", configs = {torch_dtype:'float32'}) +``` + + +### Scaling Configuration + +The horizontal scaling configurations of the endpoint are based on the number of requests to the prediction request path. +You can tweak the settings at creation too. +```py +from outpostkit._types.endpoint import ReplicaScalingConfig +scaling_config = ReplicaScalingConfig(min=0,max=1,scaledownPeriod=900,targetPendingRequests=100) # Defaults +endpoint = endpoints_client.create(template=template,replica_scaling_config = scaling_config) +``` + +`scaledownPeriod`: The period to wait after the last reported active before scaling the resource back to 0. +`targetPendingRequests`: This is the number of pending (or in-progress) requests that your application needs to have before it is scaled up. Conversely, if your application has below this number of pending requests, it will scaled down. diff --git a/examples/endpoints/Readme.md b/examples/endpoints/Readme.md new file mode 100644 index 0000000..20d06c3 --- /dev/null +++ b/examples/endpoints/Readme.md @@ -0,0 +1 @@ +# Example usage of the SDK to work with Outpost Endpoints diff --git a/examples/endpoints/sentence-similarity.py b/examples/endpoints/sentence-similarity.py new file mode 100644 index 0000000..f1075c6 --- /dev/null +++ b/examples/endpoints/sentence-similarity.py @@ -0,0 +1,42 @@ +import os +from typing import Optional + +from outpostkit._types.endpoint import ( + EndpointAutogeneratedHFModelDetails, + EndpointAutogeneratedTemplateConfig, +) +from outpostkit._utils.constants import OutpostSecret +from outpostkit.client import Client +from outpostkit.endpoints import Endpoints + +API_TOKEN = os.getenv("OUTPOST_API_TOKEN") +HF_TOKEN: Optional[str] = None +ENTITY: str = "aj-ya" +template = EndpointAutogeneratedTemplateConfig( + modelSource="huggingface", + huggingfaceModel=EndpointAutogeneratedHFModelDetails( + id="nomic-ai/nomic-embed-text-v1", + ), +) + +endpt = Endpoints(client=Client(api_token=API_TOKEN), entity=ENTITY).create( + template=template, + name="text-embedder-2", + hardware_instance="1xnvidia-tesla-t4", + secrets=( + [OutpostSecret(name="HUGGING_FACE_HUB_TOKEN", value=HF_TOKEN)] + if HF_TOKEN + else None + ), +) +endpt.deploy() + +print(f"name: {endpt.name}") +print(f"home: https://outpost.run/{ENTITY}/inference-endpoints/{endpt.name}/overview") + + +# wait for endpoint to start. + +# if endpt.get().status == "healthy": +# predictor = endpt.create_predictor() +# predictor.infer(json={"sentences": "hello."}) diff --git a/examples/finetunings/Readme.md b/examples/finetunings/Readme.md new file mode 100644 index 0000000..20d06c3 --- /dev/null +++ b/examples/finetunings/Readme.md @@ -0,0 +1 @@ +# Example usage of the SDK to work with Outpost Endpoints diff --git a/examples/finetunings/clm.py b/examples/finetunings/clm.py new file mode 100644 index 0000000..7b76c4c --- /dev/null +++ b/examples/finetunings/clm.py @@ -0,0 +1,87 @@ +import os +from typing import Optional + +from outpostkit._types.finetuning import FinetuningHFSourceModel, FinetuningModelRepo +from outpostkit._utils.constants import OutpostSecret +from outpostkit._utils.finetuning import FinetuningTask +from outpostkit.client import Client +from outpostkit.finetuning import Finetunings + +API_TOKEN = os.getenv("OUTPOST_API_TOKEN") +HF_TOKEN: Optional[str] = None +ENTITY: str = "aj-ya" + +CONFIGS = { + "lr": 3e-5, + "epochs": 1, + "batch_size": 2, + "warmup_ratio": 0.1, + "gradient_accumulation": 1, + "optimizer": "adamw_torch", + "scheduler": "linear", + "weight_decay": 0.0, + "max_grad_norm": 1.0, + "seed": 26, + "block_size": -1, + "disable_tqdm": True, + "mixed_precision": None, # fp16 or bf16 + "logging_steps": -1, + "evaluation_strategy": "epoch", + "save_total_limit": 1, + "save_strategy": "epoch", + "add_eos_token": True, + "auto_find_batch_size": False, + "model_max_length": 2048, + "target_modules": None, + "merge_adapter": False, + "use_flash_attention_2": False, + "disable_gradient_checkpointing": False, + # "model_ref": None, check + "early_stopping": True, + "early_stopping_configs": { + "patience": 3, # None + "threshold": 0.01, # None + }, # None + "padding": None, + "peft": True, + "peft_configs": { + "lora": {"r": 16, "alpha": 32, "dropout": 0.05}, # or None + "quantization": None, # int4 or int8 + }, # or None +} +client = Client(api_token=API_TOKEN) +fntun = Finetunings(client=client, entity=ENTITY).create( + name="clm-example", + task_type=FinetuningTask.clm_default, + dataset="aj-ya/copper_bonobo", + train_path="train.csv", + validation_path="valid.csv", + secrets=( + [OutpostSecret(name="HUGGING_FACE_HUB_TOKEN", value=HF_TOKEN)] + if HF_TOKEN + else None + ), +) +job = fntun.create_job( + hardware_instance="e2-standard-8", + finetuned_model_repo=FinetuningModelRepo( + full_name="aj-ya/finetuned-clm", branch="main" + ), + column_configs={"text": "text"}, + configs=CONFIGS, + model_source="huggingface", + source_model=FinetuningHFSourceModel(id="openaicommunity/gpt2"), + enqueue=True, +) + + +print(f"name: {fntun.name}") +print(f"home: https://outpost.run/{ENTITY}/fine-tuning/{fntun.name}/overview") +print(f"job id: {job.id}") + + +# wait for endpoint to start. + +# if endpt.get().status == "healthy": +# predictor = endpt.create_predictor() +# predictor.infer(json={"sentences": "hello."}) diff --git a/outpostkit/__init__.py b/outpostkit/__init__.py index 70a8366..72feaab 100644 --- a/outpostkit/__init__.py +++ b/outpostkit/__init__.py @@ -1,6 +1,10 @@ -__version__ = "0.0.38" +__version__ = "0.0.69" from outpostkit.client import Client as Client from outpostkit.endpoints import Endpoint as Endpoint from outpostkit.endpoints import Endpoints as Endpoints +from outpostkit.finetuning import Finetunings as Finetunings +from outpostkit.finetuning import FinetuningService as FinetuningService +from outpostkit.repository import Repository as Repository +from outpostkit.repository import RepositoryAtRef as RepositoryAtRef from outpostkit.team import Team as Team from outpostkit.user import User as User diff --git a/outpostkit/_types/endpoint.py b/outpostkit/_types/endpoint.py index 664df7e..0d8ff69 100644 --- a/outpostkit/_types/endpoint.py +++ b/outpostkit/_types/endpoint.py @@ -1,22 +1,18 @@ from dataclasses import dataclass, field from typing import Any, Dict, List, Literal, Mapping, Optional +from outpostkit._types.entity import HardwareInstanceDetails + from .user import UserShortDetails @dataclass -class DomainInEndpoint: +class EndpointDomainDetails: protocol: str name: str id: str -@dataclass -class EndpointHardwareInstanceDetails: - id: str - name: str - - @dataclass class EndpointAutogeneratedHFModelDetails: id: str @@ -58,12 +54,19 @@ def __init__(self, *args, **kwargs) -> None: @dataclass class EndpointAutogeneratedTemplateConfig: modelSource: Literal["huggingface", "outpost"] - config: Optional[Dict[str, Any]] = None revision: Optional[str] = None huggingfaceModel: Optional[EndpointAutogeneratedHFModelDetails] = None outpostModel: Optional[EndpointAutogeneratedOutpostModelDetails] = None +@dataclass +class EndpointPrebuiltContainerDetails: + name: str + image: Optional[str] = None + config: Optional[Any] = None + serverArgs: Optional[Any] = None + + @dataclass class EndpointCustomTemplateConfig: type: Literal["file", "url"] @@ -153,7 +156,7 @@ class EndpointResource: healthcheckPath: str """Relative path used for healthcheck and readiness probes""" - primaryDomain: Optional[DomainInEndpoint] + primaryDomain: Optional[EndpointDomainDetails] createdAt: str @@ -161,7 +164,7 @@ class EndpointResource: status: str - hardwareInstance: EndpointHardwareInstanceDetails + hardwareInstance: HardwareInstanceDetails port: int @@ -169,7 +172,9 @@ class EndpointResource: # creatorId: Optional[str]=None - # currentDeploymentId: Optional[str]=None + prebuiltContainerDetails: Optional[EndpointPrebuiltContainerDetails] = None + + currentDeploymentId: Optional[str] = None currentDeployment: Optional[EndpointDeployment] = None @@ -198,9 +203,11 @@ def __init__(self, *args, **kwargs: Mapping[str, Any]) -> None: **kwargs.get("customTemplateConfig") ) elif _field == "primaryDomain" and kwargs.get("primaryDomain") is not None: - self.primaryDomain = DomainInEndpoint(**kwargs.get("primaryDomain")) + self.primaryDomain = EndpointDomainDetails( + **kwargs.get("primaryDomain") + ) elif _field == "hardwareInstance": - self.hardwareInstance = EndpointHardwareInstanceDetails( + self.hardwareInstance = HardwareInstanceDetails( **kwargs.get("hardwareInstance") ) elif ( @@ -217,6 +224,13 @@ def __init__(self, *args, **kwargs: Mapping[str, Any]) -> None: self.replicaScalingConfig = ReplicaScalingConfig( **kwargs.get("replicaScalingConfig") ) + elif ( + _field == "prebuiltContainerDetails" + and kwargs.get("prebuiltContainerDetails") is not None + ): + self.pre = EndpointPrebuiltContainerDetails( + **kwargs.get("prebuiltContainerDetails") + ) else: setattr(self, _field, kwargs.get(_field)) @@ -260,3 +274,23 @@ def __init__(self, *args, **kwargs) -> None: ) else: setattr(self, _field, kwargs.get(_field)) + + +@dataclass +class EndpointLogData: + level_num: int + log_type: Literal["runtime", "dep", "event"] + level: str + logger_name: str + message: str + exc_info: Optional[str] = None + stack_info: Optional[str] = None + replica: Optional[str] = None + extra: Dict[str, Any] = field(default_factory=lambda: {}) + # TODO extend for all the info + + +@dataclass +class EndpointLog: + timestamp: str + data: EndpointLogData diff --git a/outpostkit/_types/entity.py b/outpostkit/_types/entity.py index 62d7063..cf875bf 100644 --- a/outpostkit/_types/entity.py +++ b/outpostkit/_types/entity.py @@ -11,3 +11,9 @@ class FollowEntity: type: ENTITY_TYPES avatarUrl: str isFollowing: Optional[bool] + + +@dataclass +class HardwareInstanceDetails: + id: str + name: str diff --git a/outpostkit/_types/finetuning.py b/outpostkit/_types/finetuning.py new file mode 100644 index 0000000..cb5b5e1 --- /dev/null +++ b/outpostkit/_types/finetuning.py @@ -0,0 +1,124 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, List, Literal, Optional, Union + +from outpostkit._types.entity import HardwareInstanceDetails +from outpostkit._utils.finetuning import FinetuningTask + + +@dataclass +class FinetuningServiceCreateResponse: + id: int + name: str + + +@dataclass +class FinetuningHFSourceModel: + id: str + key_id: Optional[str] = None + revision: Optional[str] = None + + +@dataclass +class FinetuningOutpostSourceModel: + full_name: str + revision: Optional[str] = None + + +@dataclass +class FinetuningModelRepo: + full_name: str + branch: Optional[str] = None + + +@dataclass +class FinetuningJobCreationResponse: + id: str + + +@dataclass +class FinetuningJobLogData: + level_num: int + log_type: Literal["runtime", "dep", "event"] + level: str + logger_name: str + message: str + exc_info: Optional[str] = None + stack_info: Optional[str] = None + extra: Dict[str, Any] = field(default_factory=lambda: {}) + # TODO extend for all the info + + +@dataclass +class FinetuningJobLog: + timestamp: str + data: FinetuningJobLogData + + +@dataclass +class FinetuningResource: + name: str + full_name: str + id: str + dataset: str + task_type: str + created_at: str + updated_at: str + train_path: str + valid_path: Optional[str] = None + + def __init__(self, *args, **kwargs) -> None: + for _field in self.__annotations__: + if _field == "trainPath": + self.train_path = kwargs.get("trainPath") # type: ignore + if _field == "validPath": + self.valid_path = kwargs.get("validPath") # type: ignore + elif _field == "taskType": + self.task_type = FinetuningTask[kwargs.get("taskType")] # type: ignore + elif _field == "createdAt": + self.created_at = kwargs.get("createdAt") # type: ignore + elif _field == "updatedAt": + self.updated_at = kwargs.get("updateAt") # type: ignore + elif _field == "fullName": + self.full_name = kwargs.get("fullName") # type: ignore + else: + setattr(self, _field, kwargs.get(_field)) + + +@dataclass +class FinetuningsListResponse: + total: int + finetunings: List[FinetuningResource] + + def __init__(self, total: int, finetunings: List[Dict]) -> None: + fntns: List[FinetuningResource] = [] + self.total = total + for inf in finetunings: + fntns.append(FinetuningResource(**inf)) + self.finetunings = fntns + + +@dataclass +class FinetunedModel: + full_name: str + commit: Optional[str] + branch: str + + +@dataclass +class FinetuningJobResource: + id: str + created_at: str + status: str + model_source: Literal["outpost", "huggingface", "none"] + hardware_instance: HardwareInstanceDetails + dataset_revision: str + finetuned_model: FinetunedModel + source_model: Optional[ + Union[FinetuningHFSourceModel, FinetuningOutpostSourceModel] + ] = None + + +@dataclass +class FinetuningJobTrainerLog: + id: str + log: Dict diff --git a/outpostkit/_types/repository.py b/outpostkit/_types/repository.py new file mode 100644 index 0000000..eadf4ee --- /dev/null +++ b/outpostkit/_types/repository.py @@ -0,0 +1,3 @@ +from typing import Literal + +REPOSITORY_TYPES = Literal["model", "dataset"] diff --git a/outpostkit/_utils/__init__.py b/outpostkit/_utils/__init__.py new file mode 100644 index 0000000..dbdea59 --- /dev/null +++ b/outpostkit/_utils/__init__.py @@ -0,0 +1,20 @@ +import os + +import httpx + + +def split_full_name(full_name: str): + try: + [entity, name] = full_name.split("/", 1) + return (entity, name) + except Exception: + raise ValueError(f"Could not parse fullname - {full_name}") from None + + +def save_file_at_path_from_response(response: httpx.Response, save_path: str): + os.makedirs(os.path.dirname(save_path), exist_ok=True) + + # Write the response content to the file + with open(save_path, "wb") as file: + for chunk in response.iter_bytes(): + file.write(chunk) diff --git a/outpostkit/_utils/constants.py b/outpostkit/_utils/constants.py index 77693d5..0364503 100644 --- a/outpostkit/_utils/constants.py +++ b/outpostkit/_utils/constants.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from enum import Enum @@ -5,3 +6,42 @@ class ServiceVisibility(Enum): public = "public" internal = "internal" private = "private" + + +@dataclass +class OutpostSecret: + name: str + value: str + + +scaffolding_file = """ +from typing import Callable, Dict, List, Type, Union + +from fastapi import Request + + +class PredictionTemplate: + # define custom exception handlers for the fastapi app + exception_handlers: Dict[Union[int, Type[Exception]], Callable] = dict({}) + + # extra system dependencies required + system_dependencies: List[str] = [] + + # extra python packages required + python_requirements: List[str] = [] + + # define mandatory environment variables needed for the template to run + secrets: List[str] = [] + + def __init__(self, **kwargs): + \"\"\" + An init method to download prepare the model. + \"\"\" + pass + + async def predict(self, Request: Request): + \"\"\" + prediction handler that can take paramaters like a FastAPI route handler + \"\"\" + return { "ping":"pong" } +""" diff --git a/outpostkit/_utils/finetuning.py b/outpostkit/_utils/finetuning.py new file mode 100644 index 0000000..827b690 --- /dev/null +++ b/outpostkit/_utils/finetuning.py @@ -0,0 +1,14 @@ +from enum import Enum + + +class FinetuningTask(Enum): + text_classification = "text_classification" + clm_sft = "clm_sft" + clm_dpo = "clm_dpo" + clm_default = "clm_default" + clm_reward = "clm_reward" + seq2seq = "seq2seq" + image_classification = "image_classification" + dreambooth = "dreambooth" + tabular_classification = "tabular_classification" + tabular_regression = "tabular_regression" diff --git a/outpostkit/_utils/git.py b/outpostkit/_utils/git.py new file mode 100644 index 0000000..fb684cd --- /dev/null +++ b/outpostkit/_utils/git.py @@ -0,0 +1,24 @@ +# ref: https://github.com/huggingface/huggingface_hub/blob/main/src/huggingface_hub/file_download.py +# ref: https://github.com/huggingface/transformers/blob/main/src/transformers/utils/hub.py + +import re +from pathlib import Path +from typing import Optional + +REGEX_COMMIT_HASH = re.compile(r"^[0-9a-f]{40}$") + + +def extract_commit_hash( + resolved_file: Optional[str], commit_hash: Optional[str] +) -> Optional[str]: + """ + Extracts the commit hash from a resolved filename toward a cache file. + """ + if resolved_file is None or commit_hash is not None: + return commit_hash + resolved_file = str(Path(resolved_file).as_posix()) + search = re.search(r"snapshots/([^/]+)/", resolved_file) + if search is None: + return None + commit_hash = search.groups()[0] + return commit_hash if REGEX_COMMIT_HASH.match(commit_hash) else None # type: ignore diff --git a/outpostkit/_utils/import_utils.py b/outpostkit/_utils/import_utils.py new file mode 100644 index 0000000..1a7116d --- /dev/null +++ b/outpostkit/_utils/import_utils.py @@ -0,0 +1,173 @@ +# ref: https://github.com/huggingface/transformers/blob/main/src/transformers/utils/import_utils.py +# TODO: This doesn't work for all packages (`bs4`, `faiss`, etc.) +import importlib +import importlib.metadata +import importlib.util +import os +from typing import Tuple, Union + +from packaging import version + +from outpostkit.logger import init_outpost_logger + +logger = init_outpost_logger(__name__) + + +def _is_package_available( + pkg_name: str, return_version: bool = False +) -> Union[Tuple[bool, str], bool]: + # Check if the package spec exists and grab its version to avoid importing a local directory + package_exists = importlib.util.find_spec(pkg_name) is not None + package_version = "N/A" + if package_exists: + try: + # Primary method to get the package version + package_version = importlib.metadata.version(pkg_name) + except importlib.metadata.PackageNotFoundError: + # Fallback method: Only for "torch" and versions containing "dev" + if pkg_name == "torch": + try: + package = importlib.import_module(pkg_name) + temp_version = getattr(package, "__version__", "N/A") + # Check if the version contains "dev" + if "dev" in temp_version: + package_version = temp_version + package_exists = True + else: + package_exists = False + except ImportError: + # If the package can't be imported, it's not available + package_exists = False + else: + # For packages other than "torch", don't attempt the fallback and set as not available + package_exists = False + logger.debug(f"Detected {pkg_name} version: {package_version}") + if return_version: + return package_exists, package_version + else: + return package_exists + + +ENV_VARS_TRUE_VALUES = {"1", "ON", "YES", "TRUE"} +ENV_VARS_TRUE_AND_AUTO_VALUES = ENV_VARS_TRUE_VALUES.union({"AUTO"}) + +USE_TF = os.environ.get("USE_TF", "AUTO").upper() +USE_TORCH = os.environ.get("USE_TORCH", "AUTO").upper() +USE_JAX = os.environ.get("USE_FLAX", "AUTO").upper() + +# Try to run a native pytorch job in an environment with TorchXLA installed by setting this value to 0. +USE_TORCH_XLA = os.environ.get("USE_TORCH_XLA", "1").upper() + +FORCE_TF_AVAILABLE = os.environ.get("FORCE_TF_AVAILABLE", "AUTO").upper() + + +_torch_version = "N/A" +_torch_available = False +if USE_TORCH in ENV_VARS_TRUE_AND_AUTO_VALUES and USE_TF not in ENV_VARS_TRUE_VALUES: + _torch_available, _torch_version = _is_package_available( + "torch", return_version=True + ) # type: ignore +else: + logger.info("Disabling PyTorch because USE_TF is set") + _torch_available = False + +_bitsandbytes_available = _is_package_available("bitsandbytes") + +_is_transformers_available = _is_package_available("transformers") + +_is_peft_available = _is_package_available("peft") + + +def is_transformers_available(): + return _is_transformers_available + + +def is_peft_available(): + return _is_peft_available + + +def is_torch_available(): + return _torch_available + + +def is_bitsandbytes_available(): + if not is_torch_available(): + return False + + # bitsandbytes throws an error if cuda is not available + # let's avoid that by adding a simple check + import torch + + return _bitsandbytes_available and torch.cuda.is_available() + + +def is_flash_attn_2_available() -> bool: + if not is_torch_available(): + return False + + if not _is_package_available("flash_attn"): + return False + + # Let's add an extra check to see if cuda is available + import torch + + if not torch.cuda.is_available(): + return False + + if torch.version.cuda: + return version.parse(importlib.metadata.version("flash_attn")) >= version.parse( + "2.1.0" + ) + elif torch.version.hip: + # TODO: Bump the requirement to 2.1.0 once released in https://github.com/ROCmSoftwarePlatform/flash-attention + return version.parse(importlib.metadata.version("flash_attn")) >= version.parse( + "2.0.4" + ) + else: + return False + + +_tf_version = "N/A" +_tf_available = False +if FORCE_TF_AVAILABLE in ENV_VARS_TRUE_VALUES: + _tf_available = True +else: + if ( + USE_TF in ENV_VARS_TRUE_AND_AUTO_VALUES + and USE_TORCH not in ENV_VARS_TRUE_VALUES + ): + # Note: _is_package_available("tensorflow") fails for tensorflow-cpu. Please test any changes to the line below + # with tensorflow-cpu to make sure it still works! + _tf_available = importlib.util.find_spec("tensorflow") is not None + if _tf_available: + candidates = ( + "tensorflow", + "tensorflow-cpu", + "tensorflow-gpu", + "tf-nightly", + "tf-nightly-cpu", + "tf-nightly-gpu", + "tf-nightly-rocm", + "intel-tensorflow", + "intel-tensorflow-avx512", + "tensorflow-rocm", + "tensorflow-macos", + "tensorflow-aarch64", + ) + _tf_version = None + # For the metadata, we have to look for both tensorflow and tensorflow-cpu + for pkg in candidates: + try: + _tf_version = importlib.metadata.version(pkg) + break + except importlib.metadata.PackageNotFoundError: + pass + _tf_available = _tf_version is not None + if _tf_available: + if version.parse(_tf_version) < version.parse("2"): # type: ignore + logger.info( + f"TensorFlow found but with version {_tf_version}. Transformers requires version 2 minimum." + ) + _tf_available = False + else: + logger.info("Disabling Tensorflow because USE_TORCH is set") diff --git a/outpostkit/client.py b/outpostkit/client.py index c7825b9..87ae4ee 100644 --- a/outpostkit/client.py +++ b/outpostkit/client.py @@ -225,7 +225,7 @@ def _build_httpx_client( **kwargs, ) -> Union[httpx.Client, httpx.AsyncClient]: headers = { - "User-Agent": "outpost-python/0.0.38", + "User-Agent": "outpost-python/0.0.69", } if ( diff --git a/outpostkit/constants.py b/outpostkit/constants.py index 5c6e871..5b2dd75 100644 --- a/outpostkit/constants.py +++ b/outpostkit/constants.py @@ -1 +1 @@ -V1_API_URL = "https://api.outpost.run/v1" +V1_API_URL = "https://api-gateway.outpost.run/v1" diff --git a/outpostkit/endpoints.py b/outpostkit/endpoints.py index caa6dbb..5a18a05 100644 --- a/outpostkit/endpoints.py +++ b/outpostkit/endpoints.py @@ -1,8 +1,7 @@ import json import os from dataclasses import asdict, dataclass -from datetime import datetime -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Union from urllib.parse import urlparse from httpx import Response @@ -11,15 +10,22 @@ EndpointAutogeneratedTemplateConfig, EndpointCustomTemplateConfig, EndpointDeployment, + EndpointLog, + EndpointPrebuiltContainerDetails, EndpointReplicaStatus, EndpointResource, ReplicaScalingConfig, ) -from outpostkit._utils.constants import ServiceVisibility +from outpostkit._utils.constants import ( + OutpostSecret, + ServiceVisibility, + scaffolding_file, +) from outpostkit.client import Client from outpostkit.exceptions import OutpostError from outpostkit.predictor import Predictor from outpostkit.resource import Namespace +from outpostkit.utils import parse_endpoint_log_data @dataclass @@ -41,10 +47,26 @@ class EndpointDeployResponse: class Endpoint(Namespace): - def __init__(self, client: Client, entity: str, name: str) -> None: - self.entity = entity - self.name = name - self.fullName = f"{entity}/{name}" + def __init__( + self, + client: Client, + entity: Optional[str] = None, + name: Optional[str] = None, + full_name: Optional[str] = None, + ) -> None: + if name and entity: + self.entity = entity + self.name = name + self.fullName = f"{entity}/{name}" + if full_name: + _split = full_name.split("/", 1) + assert len(_split) == 2, "Invalid Full Name" + self.entity = _split[0] + self.name = _split[1] + self.fullName = full_name + else: + raise OutpostError("Please provide identifiable information.") + super().__init__(client) def get(self) -> EndpointResource: @@ -154,7 +176,7 @@ def delete(self) -> None: """ self._client._request("DELETE", f"/endpoints/{self.fullName}") - def get_replica_status(self) -> EndpointReplicaStatus: + def replica_status(self) -> EndpointReplicaStatus: """ Get the current replica status of the endpoint Note: throws if there are no currently deployed runtimes of the endpoint. @@ -165,14 +187,24 @@ def get_replica_status(self) -> EndpointReplicaStatus: ) return EndpointReplicaStatus(**resp.json()) + def status(self): + """ + Get the current status of the endpoint + """ + resp = self._client._request( + "GET", + f"/endpoints/{self.fullName}/status", + ) + return resp.json().get("status") + def get_logs( self, log_type: Optional[Literal["dep", "runtime", "event"]] = None, deployment_id: Optional[str] = None, - start: Optional[Union[int, str, datetime]] = None, - end: Optional[Union[int, str, datetime]] = None, + start: Optional[Union[int, str]] = None, + end: Optional[Union[int, str]] = None, limit: Optional[int] = 1000, - ) -> List[Tuple[str, str]]: + ) -> List[EndpointLog]: """ Retrieve logs related to the endpoint Available log types:runtime, dep (deployment) and event. @@ -190,7 +222,13 @@ def get_logs( }, ) - return [(str(log.time), str(log.message)) for log in resp.json()] + return [ + EndpointLog( + timestamp=str(log.get("timestamp")), + data=parse_endpoint_log_data(log.get("data")), + ) + for log in resp.json() + ] def get_custom_template(self) -> Union[bytes, Any]: # noqa: ANN401 """ @@ -221,6 +259,11 @@ def __init__(self, total: int, endpoints: List[Dict]) -> None: self.endpoints = infs +def scaffold(name: str) -> None: + with open(name, "x") as f: + f.write(scaffolding_file) + + @dataclass class EndpointCreateResponse: id: int @@ -249,69 +292,83 @@ def list( def create( self, - templateConfig: Union[ + template: Union[ EndpointAutogeneratedTemplateConfig, EndpointCustomTemplateConfig ], - hardware_instance: str = "cpu-sm", + container: Optional[EndpointPrebuiltContainerDetails] = None, + hardware_instance: str = "e2-standard-2", task_type: Optional[str] = None, - prebuilt_image_name: Optional[str] = None, name: Optional[str] = None, + secrets: Optional[List[OutpostSecret]] = None, visibility: ServiceVisibility = ServiceVisibility.public, replica_scaling_config: Optional[ReplicaScalingConfig] = None, - containerType: Literal["prebuilt"] = "prebuilt", # will be extended soon ) -> Endpoint: """ Create an endpoint by providing the model details or use a custom template file. """ - if isinstance(templateConfig, EndpointAutogeneratedTemplateConfig): + if isinstance(template, EndpointAutogeneratedTemplateConfig): resp = self._client._request( "POST", f"/endpoints/{self.entity}", json={ "templateType": "autogenerated", - "hardwareInstance": hardware_instance, + "hardwareInstanceId": hardware_instance, "visibility": visibility.name, - "replicaScalingConfig": asdict(replica_scaling_config) - if replica_scaling_config - else None, + "replicaScalingConfig": ( + asdict(replica_scaling_config) + if replica_scaling_config + else None + ), "name": name, - "prebuiltImageName": prebuilt_image_name, - "containerType": containerType, + "secrets": [asdict(secret) for secret in secrets] + if secrets + else None, + "prebuiltContainerDetails": ( + asdict(container) if container else None + ), + "containerType": "prebuilt", "taskType": task_type, - "autogeneratedTemplateConfig": asdict(templateConfig), + "autogeneratedTemplateConfig": asdict(template), }, ) else: - if templateConfig.type == "file": - if not os.path.exists(templateConfig.path) or not os.path.isfile( - templateConfig.path + if template.type == "file": + if not os.path.exists(template.path) or not os.path.isfile( + template.path ): raise OutpostError("No template file found.") resp = self._client._request( "POST", f"/endpoints/{self.entity}", - files={"template": open(templateConfig.path)}, + files={"template": open(template.path, "b+r")}, data={ "metadata": json.dumps( { - "hardwareInstance": hardware_instance, + "hardwareInstanceId": hardware_instance, "visibility": visibility.name, - "replicaScalingConfig": asdict(replica_scaling_config) - if replica_scaling_config - else None, + "replicaScalingConfig": ( + asdict(replica_scaling_config) + if replica_scaling_config + else None + ), "name": name, - "prebuiltImageName": prebuilt_image_name, - "containerType": containerType, + "secrets": [asdict(secret) for secret in secrets] + if secrets + else None, + "prebuiltContainerDetails": ( + asdict(container) if container else None + ), + "containerType": "prebuilt", "taskType": task_type, "customTemplateConfig": { - "className": templateConfig.className + "className": template.className }, } ) }, ) else: - parsed = urlparse(templateConfig.path) + parsed = urlparse(template.path) if not all([parsed.scheme, parsed.netloc]): raise OutpostError("Invalid url specified in path.") resp = self._client._request( @@ -319,18 +376,25 @@ def create( f"/endpoints/{self.entity}", json={ "templateType": "custom", - "hardwareInstance": hardware_instance, + "hardwareInstanceId": hardware_instance, "visibility": visibility, - "replicaScalingConfig": asdict(replica_scaling_config) - if replica_scaling_config - else None, + "replicaScalingConfig": ( + asdict(replica_scaling_config) + if replica_scaling_config + else None + ), "name": name, - "prebuiltImageName": prebuilt_image_name, - "containerType": containerType, + "secrets": [asdict(secret) for secret in secrets] + if secrets + else None, + "prebuiltContainerDetails": ( + asdict(container) if container else None + ), + "containerType": "prebuilt", "taskType": task_type, "customTemplateConfig": { - "className": templateConfig.className, - "url": templateConfig.path, + "className": template.className, + "url": template.path, }, }, ) diff --git a/outpostkit/exceptions.py b/outpostkit/exceptions.py index 9827911..61a84c7 100644 --- a/outpostkit/exceptions.py +++ b/outpostkit/exceptions.py @@ -42,7 +42,7 @@ def __init__( self.data = data def __str__(self) -> str: - return f"status: {self.status_code}, message: {self.code + ' - '+ self.message if self.code else self.message}" + return f"status: {self.status_code}, data: {self.data}" class ModelError(Exception): diff --git a/outpostkit/finetuning.py b/outpostkit/finetuning.py new file mode 100644 index 0000000..becd029 --- /dev/null +++ b/outpostkit/finetuning.py @@ -0,0 +1,202 @@ +from dataclasses import asdict +from typing import Any, Dict, List, Literal, Optional, Union + +from outpostkit._types.finetuning import ( + FinetuningHFSourceModel, + FinetuningJobCreationResponse, + FinetuningJobLog, + FinetuningModelRepo, + FinetuningOutpostSourceModel, + FinetuningResource, + FinetuningServiceCreateResponse, + FinetuningsListResponse, +) +from outpostkit._utils.constants import OutpostSecret +from outpostkit._utils.finetuning import FinetuningTask +from outpostkit.client import Client +from outpostkit.resource import Namespace +from outpostkit.utils import parse_finetuning_job_log_data + + +class FinetuningJob(Namespace): + def __init__(self, client: Client, entity: str, name: str, job_id: str) -> None: + self.entity = entity + self.name = name + self.fullName = f"{entity}/{name}" + self.id = job_id + self._route_prefix = f"/finetunings/{self.fullName}/jobs/{self.id}" + super().__init__(client) + + def enqueue(self): + resp = self._client._request( + "POST", f"/finetunings/{self.entity}/jobs/enqueue", json={"jobs": [self.id]} + ) + return resp + + def info( + self, + with_config: Optional[bool] = None, + with_trainer_log: Optional[bool] = None, + ): + resp = self._client._request( + "GET", + self._route_prefix, + params={ + "cfg": with_config, + "trainer_log": with_trainer_log, + }, + ) + return resp + + def configs(self): + resp = self._client._request( + "GET", + f"{self._route_prefix}/configs", + ) + return resp + + def trainer_logs(self): + resp = self._client._request( + "GET", + f"{self._route_prefix}/logs/trainer", + ) + return resp + + def delete(self): + resp = self._client._request("DELETE", self._route_prefix) + return resp + + def get_logs( + self, + log_type: Optional[Literal["dep", "runtime", "event"]] = None, + start: Optional[Union[int, str]] = None, + end: Optional[Union[int, str]] = None, + limit: Optional[int] = 1000, + ) -> List[FinetuningJobLog]: + """ + Retrieve logs related to the finetuning job + Available log types:runtime, dep (deployment) and event. + Note: the start time defaults to 15 mins ago + """ + resp = self._client._request( + "GET", + f"{self._route_prefix}/logs", + params={ + "logType": log_type, + "limit": limit, + "start": start, + "end": end, + }, + ) + + return [ + FinetuningJobLog( + timestamp=str(log.get("timestamp")), + data=parse_finetuning_job_log_data(log.get("data")), + ) + for log in resp.json() + ] + + +class FinetuningService(Namespace): + def __init__(self, client: Client, entity: str, name: str) -> None: + self.entity = entity + self.name = name + self.fullName = f"{entity}/{name}" + self._route_prefix = f"/finetunings/{self.fullName}" + super().__init__(client) + + def info(self): + resp = self._client._request("GET", f"{self._route_prefix}") + return FinetuningResource(**resp.json()) + + def list_jobs( + self, + status_in: Optional[List[str]] = None, + status_not_in: Optional[List[str]] = None, + with_config: Optional[bool] = None, + with_trainer_log: Optional[bool] = None, + ): + resp = self._client._request( + "GET", + f"{self._route_prefix}/jobs", + params={ + "statusIn": ",".join(status_in) if status_in else None, + "statusNotIn": ",".join(status_not_in) if status_not_in else None, + "cfg": with_config, + "trainer_log": with_trainer_log, + }, + ) + return resp.json() + + def create_job( + self, + hardware_instance: str, + finetuned_model_repo: FinetuningModelRepo, + configs: Dict[str, Any], + column_configs: Optional[Dict[str, str]] = None, + model_source: Literal["huggingface", "outpost", "none"] = "none", + source_model: Optional[ + Union[FinetuningHFSourceModel, FinetuningOutpostSourceModel] + ] = None, + dataset_revision: Optional[str] = "HEAD", + enqueue: Optional[bool] = None, + ) -> FinetuningJob: + resp = self._client._request( + "POST", + f"{self._route_prefix}/jobs", + json={ + "hardwareInstanceId": hardware_instance, + "configs": configs, + "columnConfigs": column_configs, + "modelSource": model_source, + "sourceHuggingfaceModel": asdict(source_model) + if isinstance(source_model, FinetuningHFSourceModel) + else None, + "sourceOutpostModel": asdict(source_model) + if isinstance(source_model, FinetuningOutpostSourceModel) + else None, + "finetunedModel": asdict(finetuned_model_repo), + "datasetCommitHash": dataset_revision, + }, + params={"enqueue": enqueue}, + ) + job_resp = FinetuningJobCreationResponse(**resp.json()) + return FinetuningJob( + client=self._client, entity=self.entity, name=self.name, job_id=job_resp.id + ) + + +class Finetunings(Namespace): + def __init__(self, client: Client, entity: str) -> None: + self.entity = entity + self._route_prefix = f"/finetunings/{self.entity}" + super().__init__(client) + + def list(self): + resp = self._client._request("GET", self._route_prefix) + return FinetuningsListResponse(**resp.json()) + + def create( + self, + name: str, + task_type: FinetuningTask, + dataset: str, + train_path: str, + validation_path: str, + secrets: Optional[List[OutpostSecret]] = None, + ) -> FinetuningService: + resp = self._client._request( + "POST", + self._route_prefix, + json={ + "name": name, + "taskType": task_type.value, + "dataset": dataset, + "trainPath": train_path, + "validPath": validation_path, + "secrets": [asdict(secret) for secret in secrets] if secrets else None, + }, + ) + obj = FinetuningServiceCreateResponse(**resp.json()) + return FinetuningService(client=self._client, entity=self.entity, name=obj.name) diff --git a/outpostkit/logger.py b/outpostkit/logger.py index ff0d65b..86de18f 100644 --- a/outpostkit/logger.py +++ b/outpostkit/logger.py @@ -1,10 +1,13 @@ import logging +import os from typing import Union -def init_outpost_logger(level: Union[str, int]) -> logging.Logger: +def init_outpost_logger( + name, level: Union[str, int] = os.getenv("LOGLEVEL", "INFO").upper() +) -> logging.Logger: # Use the same settings as above for root logger logging.basicConfig(format="%(asctime)s %(message)s") - outpost_logger = logging.getLogger("outpost_logger") + outpost_logger = logging.getLogger(name) outpost_logger.setLevel(logging.getLevelName(level)) return outpost_logger diff --git a/outpostkit/repository/__init__.py b/outpostkit/repository/__init__.py new file mode 100644 index 0000000..58a94ed --- /dev/null +++ b/outpostkit/repository/__init__.py @@ -0,0 +1,107 @@ +from outpostkit._types.repository import REPOSITORY_TYPES +from outpostkit.client import Client +from outpostkit.resource import Namespace + + +# Assuming path always starts with `/`, can create a parser for this (src/... -> /src/...) +class Repository(Namespace): + def __init__( + self, client: Client, repo_type: REPOSITORY_TYPES, entity: str, name: str + ) -> None: + self.entity = entity + self.name = name + self.repo_type = repo_type + self.fullName = f"{entity}/{name}" + super().__init__(client) + + def view_blob(self, path: str, ref: str = "HEAD", raw: bool = True): + resp = self._client._request( + path=f"/git/blobs/{self.repo_type}/{self.fullName}/view/{ref}{path}", + method="GET", + params={"raw": raw}, + ) + resp.raise_for_status() + + return resp.json() # TODO Type + + def download_blob(self, path: str, ref: str = "HEAD", raw: bool = True): + resp = self._client._request( + path=f"/git/blobs/{self.repo_type}/{self.fullName}/download/{ref}{path}", + method="GET", + params={"raw": raw}, + stream=True, + ) + resp.raise_for_status() + + return resp.json() # TODO Type + + def view_tree( + self, + ref: str = "HEAD", + path: str = "/", + with_commit=False, + with_metadata=False, + ): + resp = self._client._request( + path=f"/git/tree/{self.repo_type}/{self.fullName}/view/{ref}{path}", + method="GET", + params={"with_commit": with_commit, "with_metadata": with_metadata}, + ) + resp.raise_for_status() + + return resp.json() # TODO Type + + def search_tree( + self, + search: str, + ref: str = "HEAD", + ): + resp = self._client._request( + path=f"/git/tree/{self.repo_type}/{self.fullName}/search", + method="GET", + params={"search": search, "ref": ref}, + ) + resp.raise_for_status() + + return resp.json() # TODO Type + + +class RepositoryAtRef(Namespace): + def __init__( + self, + client: Client, + repo_type: REPOSITORY_TYPES, + entity: str, + name: str, + ref: str, + ) -> None: + self.repo = Repository( + client=client, repo_type=repo_type, entity=entity, name=name + ) + self.ref = ref + super().__init__(client) + + def view_blob(self, path: str, raw: bool = True): + return self.repo.view_blob(path=path, ref=self.ref, raw=raw) + + def download_blob(self, path: str, raw: bool = True): + return self.repo.download_blob(path=path, ref=self.ref, raw=raw) + + def view_tree( + self, + path: str = "/", + with_commit=False, + with_metadata=False, + ): + return self.repo.view_tree( + path=path, + ref=self.ref, + with_commit=with_commit, + with_metadata=with_metadata, + ) + + def search_tree( + self, + search: str, + ): + return self.repo.search_tree(search=search, ref=self.ref) diff --git a/outpostkit/repository/lfs/__init__.py b/outpostkit/repository/lfs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/outpostkit/repository/lfs/client.py b/outpostkit/repository/lfs/client.py new file mode 100644 index 0000000..fd637fb --- /dev/null +++ b/outpostkit/repository/lfs/client.py @@ -0,0 +1,163 @@ +"""A simple Git LFS client +""" +import hashlib +from typing import Any, BinaryIO, Callable, Dict, List, Optional + +import requests +from six.moves import urllib_parse + +from outpostkit.repository.lfs.logger import create_lfs_logger + +from . import exc, transfer, types + +FILE_READ_BUFFER_SIZE = 4 * 1024 * 1000 # 4mb, why not + + +_log = create_lfs_logger(__name__) + + +class LfsClient: + LFS_MIME_TYPE = "application/vnd.git-lfs+json" + + TRANSFER_ADAPTERS = { + "basic": transfer.BasicTransferAdapter, + "multipart-basic": transfer.MultipartTransferAdapter, + } + + TRANSFER_ADAPTER_PRIORITY = ["multipart-basic", "basic"] + + def __init__( + self, + lfs_server_url: str, + auth_token: Optional[str] = None, + transfer_adapters: List[str] = TRANSFER_ADAPTER_PRIORITY, + ) -> None: + self._url = lfs_server_url.rstrip("/") + self._auth_token = auth_token + self._transfer_adapters = transfer_adapters + + def batch( + self, + prefix: str, + operation: str, + objects: List[Dict[str, Any]], + ref: Optional[str] = None, + transfers: Optional[List[str]] = None, + ): + # type: (str, str, List[Dict[str, Any]], Optional[str], Optional[List[str]]) -> Dict[str, Any] + """Send a batch request to the LFS server + + TODO: allow specifying more than one file for a single batch operation + """ + url = self._url_for(prefix, "objects", "batch") + if transfers is None: + transfers = self._transfer_adapters + + payload = {"transfers": transfers, "operation": operation, "objects": objects} + if ref: + payload["ref"] = ref + + headers = {"Content-type": self.LFS_MIME_TYPE, "Accept": self.LFS_MIME_TYPE} + if self._auth_token: + headers["Authorization"] = f"Bearer {self._auth_token}" + + response = requests.post(url, json=payload, headers=headers) + if response.status_code != 200: + raise exc.LfsError( + f"Unexpected response from LFS server: {response.status_code}", + status_code=response.status_code, + ) + _log.debug("Got reply for batch request: %s", response.json()) + return response.json() + + def upload( + self, + file_obj: BinaryIO, + organization: str, + repo_type: str, + repo: str, + on_progress: Optional[Callable[[int], None]] = None, + **extras, + ) -> types.ObjectAttributes: + """Upload a file to LFS storage""" + object_attrs = self._get_object_attrs(file_obj) + self._add_extra_object_attributes(object_attrs, extras) + response = self.batch( + f"{organization}/{repo_type}/{repo}", "upload", [object_attrs] + ) + + try: + adapter = self.TRANSFER_ADAPTERS[response["transfer"]]() + except KeyError: + raise ValueError( + "Unsupported transfer adapter: {}".format(response["transfer"]) + ) + + adapter.upload(file_obj, response["objects"][0], on_progress) + return object_attrs + + def download( + self, + file_obj: BinaryIO, + object_sha256: str, + object_size: int, + organization: str, + repo_type: str, + repo: str, + **extras, + ) -> None: + """Download a file and save it to file_obj + + file_obj is expected to be an file-like object open for writing in binary mode + + TODO: allow specifying more than one file for a single batch operation + """ + object_attrs = {"oid": object_sha256, "size": object_size} + self._add_extra_object_attributes(object_attrs, extras) + + response = self.batch( + f"{organization}/{repo_type}/{repo}", "download", [object_attrs] + ) + + try: + adapter = self.TRANSFER_ADAPTERS[response["transfer"]]() + except KeyError: + raise ValueError( + "Unsupported transfer adapter: {}".format(response["transfer"]) + ) + + return adapter.download(file_obj, response["objects"][0]) + + def _url_for(self, *segments: str, **params: str): + path = "/".join(segments) + url = f"{self._url}/{path}" + if params: + url = f"{url}?{urllib_parse.urlencode(params)}" + return url + + @staticmethod + def _get_object_attrs(file_obj: BinaryIO, **extras) -> types.ObjectAttributes: + digest = hashlib.sha256() + try: + while True: + data = file_obj.read(FILE_READ_BUFFER_SIZE) + if data: + digest.update(data) + else: + break + + size = file_obj.tell() + oid = digest.hexdigest() + finally: + file_obj.seek(0) + + return types.ObjectAttributes(oid=oid, size=size) + + @staticmethod + def _add_extra_object_attributes( + attributes: types.ObjectAttributes, extras: Dict[str, str] + ): + # type: (types.ObjectAttributes, Dict[str, Any]) -> None + """Add Giftless-specific 'x-...' attributes to an object dict""" + for k, v in extras.items(): + attributes[f"x-{k}"] = v diff --git a/outpostkit/repository/lfs/exc.py b/outpostkit/repository/lfs/exc.py new file mode 100644 index 0000000..7fdac2d --- /dev/null +++ b/outpostkit/repository/lfs/exc.py @@ -0,0 +1,11 @@ +"""Exception classes +""" + + +class LfsError(RuntimeError): + status_code = None + + def __init__(self, *args, **kwargs): + if "status_code" in kwargs: + self.status_code = kwargs.pop("status_code") + super(LfsError, self).__init__(*args, **kwargs) diff --git a/outpostkit/repository/lfs/logger.py b/outpostkit/repository/lfs/logger.py new file mode 100644 index 0000000..c5e3b0c --- /dev/null +++ b/outpostkit/repository/lfs/logger.py @@ -0,0 +1,22 @@ +import logging +import os + +log_dir = "/tmp" +log_file_path = os.path.expanduser(f"{log_dir}/outpostkit.log") +outpost_folder = os.path.dirname(log_file_path) + +if not os.path.exists(outpost_folder): + # Create the ~/.outpost folder if it doesn't exist + os.makedirs(outpost_folder) + + +def create_lfs_logger(name: str): + _log = logging.getLogger(name) + _log.handlers.clear() + _log.setLevel(10) + file_handler = logging.FileHandler(log_file_path) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) + _log.addHandler(file_handler) + return _log diff --git a/outpostkit/repository/lfs/transfer.py b/outpostkit/repository/lfs/transfer.py new file mode 100644 index 0000000..442b758 --- /dev/null +++ b/outpostkit/repository/lfs/transfer.py @@ -0,0 +1,189 @@ +import base64 +import hashlib +from typing import Any, BinaryIO, Callable, Dict, Optional, Union + +import requests + +from outpostkit.repository.lfs.logger import create_lfs_logger + +from . import types + +_log = create_lfs_logger(__name__) + + +class BasicTransferAdapter: + def upload( + self, + file_obj: BinaryIO, + upload_spec: types.UploadObjectAttributes, + on_progress: Optional[Callable[[int], None]] = None, + ) -> None: + try: + ul_action = upload_spec["actions"]["upload"] + except KeyError: # Object is already on the server + return + + reply = requests.put( + ul_action["href"], headers=ul_action.get("header", {}), data=file_obj + ) + ul_action.get("header", {}) + if reply.status_code // 100 != 2: + raise RuntimeError( + "Unexpected reply from server for upload: {} {}".format( + reply.status_code, reply.text + ) + ) + + vfy_action = upload_spec["actions"].get("verify") + if vfy_action: + self._verify_object(vfy_action, upload_spec["oid"], upload_spec["size"]) + + def download( + self, file_obj: BinaryIO, download_spec: types.DownloadObjectAttributes + ) -> None: + """Download an object from LFS""" + dl_action = download_spec["actions"]["download"] + with requests.get( + dl_action["href"], headers=dl_action.get("header", {}), stream=True + ) as response: + for chunk in response.iter_content(1024 * 16): + file_obj.write(chunk) + + @staticmethod + def _verify_object( + verify_action: types.BasicActionAttributes, oid: str, size: int + ) -> None: + _log.info("Sending verify action to %s", verify_action["href"]) + response = requests.post( + verify_action["href"], + headers=verify_action.get("header", {}), + json={"oid": oid, "size": size}, + ) + if response.status_code // 100 != 2: + raise RuntimeError( + "verify failed with error status code: {}: {}".format( + response.status_code, response.text + ) + ) + + +class MultipartTransferAdapter(BasicTransferAdapter): + def upload( + self, + file_obj: BinaryIO, + upload_spec: types.MultipartUploadObjectAttributes, + on_progress: Optional[Callable[[int], None]] = None, + ): + """Do a multipart upload""" + actions = upload_spec.get("actions") + if not actions: + _log.info("No actions, file already exists") + return + + init_action = actions.get("init") + if init_action: + _log.info("Sending multipart init action to %s", init_action["href"]) + response = self._send_request( + init_action["href"], + method=init_action.get("method", "POST"), + headers=init_action.get("header", {}), + body=init_action.get("body"), + ) + if response.status_code // 100 != 2: + raise RuntimeError( + f"init failed with error status code: {response.status_code}" + ) + completed_parts = [] + part_action = actions.get("part") + if part_action: + all_parts = part_action.get("parts", []) + for p, part in enumerate(all_parts): + _log.info("Uploading part %d/%d", p + 1, len(all_parts)) + etag = self._send_part_request(file_obj, **part) + if on_progress: + on_progress(part["size"]) + completed_parts.append({"ETag": etag, "PartNumber": p + 1}) + + commit_action = actions.get("commit") + if commit_action: + _log.info("Sending multipart commit action to %s", commit_action["href"]) + response = self._send_request( + commit_action["href"], + method=commit_action.get("method", "POST"), + headers=commit_action.get("header", {}), + json={"oid": upload_spec.get("oid"), "parts": completed_parts}, + ) + if response.status_code // 100 != 2: + raise RuntimeError( + "commit failed with error status code: {}: {}".format( + response.status_code, response.text + ) + ) + + verify_action = actions.get("verify") + if verify_action: + self._verify_object(verify_action, upload_spec["oid"], upload_spec["size"]) + + def _send_part_request( + self, + file_obj: BinaryIO, + href: str, + method: str = "PUT", + pos: int = 0, + size: Optional[int] = None, + want_digest: Optional[str] = None, + header: Optional[Dict[str, Any]] = None, + **_, + ): + """Upload a part""" + file_obj.seek(pos) + if size: + data = file_obj.read(size) + else: + data = file_obj.read() + + if header is None: + header = {} + + if want_digest: + digest_headers = calculate_digest_header(data, want_digest) + header.update(digest_headers) + + reply = self._send_request(href, method=method, headers=header, body=data) + if reply.status_code // 100 != 2: + raise RuntimeError( + "Unexpected reply from server for part: {} {}".format( + reply.status_code, reply.text + ) + ) + return reply.headers.get("etag") + + @staticmethod + def _send_request( + url: str, + method: str, + headers: Dict[str, str], + body: Optional[Union[bytes, str]] = None, + json: Optional[Dict] = None, + ) -> requests.Response: + """Send an arbitrary HTTP request""" + reply = requests.session().request( + method=method, + url=url, + headers=headers, + data=body, + json=json, + ) + return reply + + +def calculate_digest_header(data: bytes, want_digest: str) -> Dict[str, str]: + # type: (bytes, str) -> Dict[str, str] + """TODO: Properly implement this""" + if want_digest == "contentMD5": + digest = base64.b64encode(hashlib.md5(data).digest()).decode( + "ascii" + ) # type: str + return {"Content-MD5": digest} + else: + raise RuntimeError(f"Don't know how to handle want_digest value: {want_digest}") diff --git a/outpostkit/repository/lfs/types.py b/outpostkit/repository/lfs/types.py new file mode 100644 index 0000000..80687ee --- /dev/null +++ b/outpostkit/repository/lfs/types.py @@ -0,0 +1,58 @@ +"""Some useful type definitions for Git LFS API and transfer protocols +""" +import sys +from typing import Any, Dict, Optional + +if sys.version_info >= (3, 8): + from typing import TypedDict +else: + from typing_extensions import TypedDict + + +class ObjectAttributes(TypedDict): + oid: str + size: int + + +class BasicActionAttributes(TypedDict): + href: str + header: Optional[Dict[str, str]] + expires_in: int + + +class BasicUploadActions(TypedDict, total=False): + upload: BasicActionAttributes + verify: BasicActionAttributes + + +class BasicDownloadActions(TypedDict, total=False): + download: BasicActionAttributes + + +class UploadObjectAttributes(TypedDict, total=False): + actions: BasicUploadActions + oid: str + size: int + authenticated: Optional[bool] + + +class DownloadObjectAttributes(TypedDict, total=False): + actions: BasicDownloadActions + oid: str + size: int + authenticated: Optional[bool] + + +class MultipartUploadActions(TypedDict, total=False): + init: Dict[str, Any] + commit: Dict[str, Any] + part: Dict[str, Any] + abort: Dict[str, Any] + verify: Dict[str, Any] + + +class MultipartUploadObjectAttributes(TypedDict, total=False): + actions: MultipartUploadActions + oid: str + size: int + authenticated: Optional[bool] diff --git a/outpostkit/user.py b/outpostkit/user.py index c3398fa..7efb3eb 100644 --- a/outpostkit/user.py +++ b/outpostkit/user.py @@ -26,6 +26,17 @@ def get(self) -> UserDetails: return UserDetails(**resp.json()) + def list_third_party_tokens(self) -> UserDetails: + """Get User + + Returns: + The User details. + """ + resp = self._client._request(path="/user/tokens", method="GET") + resp.raise_for_status() + + return resp.json() + async def async_get(self) -> UserDetails: """Get User diff --git a/outpostkit/utils.py b/outpostkit/utils.py index 6870534..eb6fd87 100644 --- a/outpostkit/utils.py +++ b/outpostkit/utils.py @@ -1,5 +1,58 @@ from datetime import datetime +from typing import Any, Dict, List, Tuple + +from outpostkit._types.endpoint import EndpointLogData +from outpostkit._types.finetuning import FinetuningJobLogData def convert_outpost_date_str_to_date(date_string: str) -> datetime: return datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.%fZ") + + +def separate_keys( + dictionary: Dict[str, Any], known_keys: List[str] +) -> Tuple[Dict[str, Any], Dict[str, Any]]: + known_dict = {} + unknown_dict = {} + + for key, value in dictionary.items(): + if key in known_keys: + known_dict[key] = value + else: + unknown_dict[key] = value + + return (known_dict, unknown_dict) + + +def parse_endpoint_log_data(log_data: Dict[str, Any]) -> EndpointLogData: + known_keys = [ + "level_num", + "log_type", + "level", + "logger_name", + "message", + "exc_info", + "stack_info", + ] + (known_dict, extra) = separate_keys(log_data, known_keys=known_keys) + replica = None + kube_data = extra.get("kubernetes") + if kube_data and isinstance(kube_data, dict): + if "pod_name" in kube_data and isinstance(kube_data["pod_name"], str): + parts = kube_data["pod_name"].split("-") + replica = parts[-1] + return EndpointLogData(**known_dict, replica=replica, extra=extra) + + +def parse_finetuning_job_log_data(log_data: Dict[str, Any]) -> FinetuningJobLogData: + known_keys = [ + "level_num", + "log_type", + "level", + "logger_name", + "message", + "exc_info", + "stack_info", + ] + (known_dict, extra) = separate_keys(log_data, known_keys=known_keys) + return FinetuningJobLogData(**known_dict, extra=extra) diff --git a/pyproject.toml b/pyproject.toml index 4b10706..e843779 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "outpostkit" -version = "0.0.38" +version = "0.0.69" description = "Python client for Outpost" readme = "README.md" license = { file = "LICENSE" } @@ -34,7 +34,7 @@ repository = "https://github.com/outposthq/outpostkit-python" testpaths = "tests/" [tool.setuptools] -packages = ["outpostkit","outpostkit._types","outpostkit._utils"] +packages = ["outpostkit","outpostkit._types","outpostkit._utils","outpostkit.repository","outpostkit.repository.lfs"] [tool.setuptools.package-data] "outpostkit" = ["py.typed"]