Skip to content

Cortex python client #488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Note: it's ok if example training notebooks aren't upgraded, as long as the expo
* [flask](https://pypi.org/project/flask/)
* [flask-api](https://pypi.org/project/flask-api/)
* [waitress](https://pypi.org/project/waitress/)
* [dill](https://pypi.org/project/dill/)
1. Update the versions listed in "Pre-installed Packages" in `request-handlers.py`

## Istio
Expand Down
45 changes: 45 additions & 0 deletions docs/cluster/python-client.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Python Client

The Python client can be used to programmatically deploy models to a Cortex Cluster.

<!-- CORTEX_VERSION_MINOR -->
```
pip install git+https://github.com/cortexlabs/cortex.git@master#egg=cortex\&subdirectory=pkg/workloads/cortex/client
```

The Python client needs to be initialized with AWS credentials and an operator URL for your Cortex cluster. You can find the operator URL by running `./cortex.sh endpoints`.

```python
from cortex import Client

cortex = Client(
aws_access_key_id="<string>", # AWS access key associated with the account that the cluster is running on
aws_secret_access_key="<string>", # AWS secret key associated with the AWS access key
operator_url="<string>" # operator URL of your cluster
)

api_url = cortex.deploy(
deployment_name="<string>", # deployment name (required)
api_name="<string>", # API name (required)
model_path="<string>", # S3 path to an exported model (required)
pre_inference=callable, # function used to prepare requests for model input
post_inference=callable, # function used to prepare model output for response
model_format="<string>", # model format, must be "tensorflow" or "onnx" (default: "onnx" if model path ends with .onnx, "tensorflow" if model path ends with .zip or is a directory)
tf_serving_key="<string>" # name of the signature def to use for prediction (required if your model has more than one signature def)
)
```

`api_url` contains the URL of the deployed API. The API accepts JSON POST requests.

```python
import requests

sample = {
"feature_1": 'a',
"feature_2": 'b',
"feature_3": 'c'
}

resp = requests.post(api_url, json=sample)
resp.json()
```
4 changes: 3 additions & 1 deletion pkg/operator/api/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
)

type DeployResponse struct {
Message string `json:"message"`
Message string `json:"message"`
Context *context.Context `json:"context"`
APIsBaseURL string `json:"apis_base_url"`
}

type DeleteResponse struct {
Expand Down
23 changes: 16 additions & 7 deletions pkg/operator/endpoints/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,29 @@ func Deploy(w http.ResponseWriter, r *http.Request) {
return
}

apisBaseURL, err := workloads.APIsBaseURL()
if err != nil {
RespondError(w, err)
return
}

deployResponse := schema.DeployResponse{Context: ctx, APIsBaseURL: apisBaseURL}
switch {
case isUpdating && ignoreCache:
Respond(w, schema.DeployResponse{Message: ResCachedDeletedDeploymentStarted})
deployResponse.Message = ResCachedDeletedDeploymentStarted
case isUpdating && !ignoreCache:
Respond(w, schema.DeployResponse{Message: ResDeploymentUpdated})
deployResponse.Message = ResDeploymentUpdated
case !isUpdating && ignoreCache:
Respond(w, schema.DeployResponse{Message: ResCachedDeletedDeploymentStarted})
deployResponse.Message = ResCachedDeletedDeploymentStarted
case !isUpdating && !ignoreCache && existingCtx == nil:
Respond(w, schema.DeployResponse{Message: ResDeploymentStarted})
deployResponse.Message = ResDeploymentStarted
case !isUpdating && !ignoreCache && existingCtx != nil && !fullCtxMatch:
Respond(w, schema.DeployResponse{Message: ResDeploymentUpdated})
deployResponse.Message = ResDeploymentUpdated
case !isUpdating && !ignoreCache && existingCtx != nil && fullCtxMatch:
Respond(w, schema.DeployResponse{Message: ResDeploymentUpToDate})
deployResponse.Message = ResDeploymentUpToDate
default:
Respond(w, schema.DeployResponse{Message: ResDeploymentUpdated})
deployResponse.Message = ResDeploymentUpdated
}

Respond(w, deployResponse)
}
15 changes: 15 additions & 0 deletions pkg/workloads/cortex/client/cortex/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2019 Cortex Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from cortex.client import Client
144 changes: 144 additions & 0 deletions pkg/workloads/cortex/client/cortex/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright 2019 Cortex Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pathlib
from pathlib import Path
import os
import types
import subprocess
import sys
import shutil
import yaml
import urllib.parse
import base64

import dill
import requests
from requests.exceptions import HTTPError
import msgpack


class Client(object):
def __init__(self, aws_access_key_id, aws_secret_access_key, operator_url):
"""Initialize a Client to a Cortex Operator

Args:
aws_access_key_id (string): AWS access key associated with the account that the cluster is running on
aws_secret_access_key (string): AWS secret key associated with the AWS access key
operator_url (string): operator URL of your cluster
"""

self.operator_url = operator_url
self.workspace = str(Path.home() / ".cortex" / "workspace")
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.headers = {
"CortexAPIVersion": "master",
"Authorization": "CortexAWS {}|{}".format(
self.aws_access_key_id, self.aws_secret_access_key
),
}

pathlib.Path(self.workspace).mkdir(parents=True, exist_ok=True)

def deploy(
self,
deployment_name,
api_name,
model_path,
pre_inference=None,
post_inference=None,
model_format=None,
tf_serving_key=None,
):
"""Deploy an API

Args:
deployment_name (string): deployment name
api_name (string): API name
model_path (string): S3 path to an exported model
pre_inference (function, optional): function used to prepare requests for model input
post_inference (function, optional): function used to prepare model output for response
model_format (string, optional): model format, must be "tensorflow" or "onnx" (default: "onnx" if model path ends with .onnx, "tensorflow" if model path ends with .zip or is a directory)
tf_serving_key (string, optional): name of the signature def to use for prediction (required if your model has more than one signature def)

Returns:
string: url to the deployed API
"""

working_dir = os.path.join(self.workspace, deployment_name)
api_working_dir = os.path.join(working_dir, api_name)
pathlib.Path(api_working_dir).mkdir(parents=True, exist_ok=True)

api_config = {"kind": "api", "model": model_path, "name": api_name}

if tf_serving_key is not None:
api_config["model_format"] = tf_serving_key

if model_format is not None:
api_config["model_format"] = model_format

if pre_inference is not None or post_inference is not None:
reqs = subprocess.check_output([sys.executable, "-m", "pip", "freeze"])

with open(os.path.join(api_working_dir, "requirements.txt"), "w") as f:
f.writelines(reqs.decode())

handlers = {}

if pre_inference is not None:
handlers["pre_inference"] = pre_inference

if post_inference is not None:
handlers["post_inference"] = post_inference

with open(os.path.join(api_working_dir, "request_handler.pickle"), "wb") as f:
dill.dump(handlers, f, recurse=True)

api_config["request_handler"] = "request_handler.pickle"

cortex_config = [{"kind": "deployment", "name": deployment_name}, api_config]

cortex_yaml_path = os.path.join(working_dir, "cortex.yaml")
with open(cortex_yaml_path, "w") as f:
f.write(yaml.dump(cortex_config))

project_zip_path = os.path.join(working_dir, "project")
shutil.make_archive(project_zip_path, "zip", api_working_dir)
project_zip_path += ".zip"

queries = {"force": "false", "ignoreCache": "false"}

with open(cortex_yaml_path, "rb") as config, open(project_zip_path, "rb") as project:
files = {"cortex.yaml": config, "project.zip": project}
try:
resp = requests.post(
urllib.parse.urljoin(self.operator_url, "deploy"),
params=queries,
files=files,
headers=self.headers,
verify=False,
)
resp.raise_for_status()
resources = resp.json()
except HTTPError as err:
resp = err.response
if "error" in resp.json():
raise Exception(resp.json()["error"]) from err
raise

b64_encoded_context = resources["context"]
context_msgpack_bytestring = base64.b64decode(b64_encoded_context)
ctx = msgpack.loads(context_msgpack_bytestring, raw=False)
return urllib.parse.urljoin(resources["apis_base_url"], ctx["apis"][api_name]["path"])
26 changes: 26 additions & 0 deletions pkg/workloads/cortex/client/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2019 Cortex Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from setuptools import setup, find_packages

setup(
name="cortex",
version="0.8.0",
description="",
author="Cortex Labs",
author_email="[email protected]",
install_requires=["dill>=0.3.0", "requests>=2.20.0", "msgpack>=0.6.0"],
setup_requires=["setuptools"],
packages=find_packages(),
)
22 changes: 18 additions & 4 deletions pkg/workloads/cortex/lib/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import os
import imp
import inspect

import boto3
import datadog
import dill

from cortex import consts
from cortex.lib import util
Expand Down Expand Up @@ -108,10 +110,22 @@ def download_python_file(self, impl_key, module_name):

def load_module(self, module_prefix, module_name, impl_path):
full_module_name = "{}_{}".format(module_prefix, module_name)
try:
impl = imp.load_source(full_module_name, impl_path)
except Exception as e:
raise UserException("unable to load python file", str(e)) from e

if impl_path.endswith(".pickle"):
try:
impl = imp.new_module(full_module_name)

with open(impl_path, "rb") as pickle_file:
pickled_dict = dill.load(pickle_file)
for key in pickled_dict:
setattr(impl, key, pickled_dict[key])
except Exception as e:
raise UserException("unable to load pickle", str(e)) from e
else:
try:
impl = imp.load_source(full_module_name, impl_path)
except Exception as e:
raise UserException("unable to load python file", str(e)) from e

return impl

Expand Down
1 change: 1 addition & 0 deletions pkg/workloads/cortex/lib/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ numpy==1.17.2
json_tricks==3.13.2
requests==2.22.0
datadog==0.30.0
dill==0.3.0
9 changes: 2 additions & 7 deletions pkg/workloads/cortex/lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import zipfile
import hashlib
import msgpack
import pathlib
from copy import deepcopy
from datetime import datetime

Expand Down Expand Up @@ -62,13 +63,7 @@ def snake_to_camel(input, sep="_", lower=True):


def mkdir_p(dir_path):
try:
os.makedirs(dir_path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(dir_path):
pass
else:
raise
pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True)


def rm_dir(dir_path):
Expand Down