yerang's picture
Upload 701 files
7931de6 verified
raw
history blame
17.1 kB
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib
import tempfile
import numpy as np
from pytriton.decorators import TritonContext, batch
from pytriton.model_config.tensor import Tensor
from pytriton.model_config.triton_model_config import TensorSpec
from pytriton.models.manager import ModelManager
from pytriton.models.model import Model, ModelConfig
from pytriton.proxy.communication import TensorStore
from pytriton.proxy.types import Request
from pytriton.utils.workspace import Workspace
def test_get_model_config_return_model_config_when_minimal_required_data(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(dtype=np.float32, shape=(-1,)),
Tensor(dtype=np.float32, shape=(-1,)),
],
outputs=[
Tensor(dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
model_config = model._get_triton_model_config()
assert model_config.model_name == "simple"
assert model_config.model_version == 2
assert model_config.batching is True
assert model_config.max_batch_size == 128
assert model_config.inputs == [
TensorSpec(name="INPUT_1", dtype=np.float32, shape=(-1,)),
TensorSpec(name="INPUT_2", dtype=np.float32, shape=(-1,)),
]
assert model_config.outputs == [
TensorSpec(name="OUTPUT_1", dtype=np.int32, shape=(-1,)),
]
ipc_socket_path = workspace.path / "ipc_proxy_backend_simple"
assert model_config.backend_parameters == {
"shared-memory-socket": f"ipc://{ipc_socket_path.as_posix()}",
}
def test_get_model_config_return_model_config_when_custom_names():
def infer_func(inputs):
return inputs
triton_context = TritonContext()
with tempfile.TemporaryDirectory() as tempdir:
tempdir = pathlib.Path(tempdir)
workspace = Workspace(tempdir / "workspace")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32().dtype, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32().dtype, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
model_config = model._get_triton_model_config()
assert model_config.model_name == "simple"
assert model_config.model_version == 2
assert model_config.batching is True
assert model_config.max_batch_size == 128
assert model_config.inputs == [
TensorSpec(name="variable1", dtype=object, shape=(2, 1)),
TensorSpec(name="variable2", dtype=np.float32, shape=(2, 1)),
]
assert model_config.outputs == [
TensorSpec(name="factorials", dtype=np.int32, shape=(-1,)),
]
def test_generate_model_create_model_store():
def infer_func(inputs):
return inputs
triton_context = TritonContext()
with tempfile.TemporaryDirectory() as tempdir:
tempdir = pathlib.Path(tempdir)
workspace = Workspace(tempdir / "workspace")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
with tempfile.TemporaryDirectory() as tempdir:
model_repository = pathlib.Path(tempdir) / "model_repository"
model_repository.mkdir()
model.generate_model(model_repository)
assert (model_repository / "simple").is_dir()
assert (model_repository / "simple" / "config.pbtxt").is_file()
assert (model_repository / "simple" / "2").is_dir()
assert (model_repository / "simple" / "2" / "model.py").is_file()
def test_generate_models_with_same_names_and_different_versions_create_model_store():
def infer_func(inputs):
return inputs
triton_context = TritonContext()
with tempfile.TemporaryDirectory() as tempdir:
tempdir = pathlib.Path(tempdir)
workspace = Workspace(tempdir / "workspace")
model1 = Model(
model_name="simple",
model_version=1,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
model2 = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
with tempfile.TemporaryDirectory() as tempdir:
model_repository = pathlib.Path(tempdir) / "model_repository"
model_repository.mkdir()
model1.generate_model(model_repository)
model2.generate_model(model_repository)
assert (model_repository / "simple").is_dir()
assert (model_repository / "simple" / "config.pbtxt").is_file()
assert (model_repository / "simple" / "1").is_dir()
assert (model_repository / "simple" / "1" / "model.py").is_file()
assert (model_repository / "simple" / "2").is_dir()
assert (model_repository / "simple" / "2" / "model.py").is_file()
def test_setup_create_proxy_backend_connection(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
tensor_store = TensorStore(workspace.path / "data_store.sock")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
try:
tensor_store.start()
model.setup()
assert len(model._inference_handlers) == 1
finally:
model.clean()
tensor_store.close()
def test_setup_can_be_called_multiple_times(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
tensor_store = TensorStore(workspace.path / "data_store.sock")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
try:
tensor_store.start()
model.setup()
assert len(model._inference_handlers) == 1
python_backend1 = model._inference_handlers[0]
assert python_backend1 is not None
model.setup()
assert len(model._inference_handlers) == 1
python_backend2 = model._inference_handlers[0]
assert python_backend2 is not None
assert python_backend1 == python_backend2
finally:
model.clean()
tensor_store.close()
def test_clean_remove_proxy_backend_connection(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
tensor_store = TensorStore(workspace.path / "data_store.sock")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
try:
tensor_store.start()
model.setup()
finally:
model.clean()
tensor_store.close()
assert len(model._inference_handlers) == 0
def test_clean_can_be_called_multiple_times(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
tensor_store = TensorStore(workspace.path / "data_store.sock")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
try:
tensor_store.start()
model.setup()
model.clean()
model.clean()
assert len(model._inference_handlers) == 0
finally:
tensor_store.close()
def test_is_alive_return_false_when_model_not_setup(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
with tempfile.TemporaryDirectory() as tempdir:
tempdir = pathlib.Path(tempdir)
workspace = Workspace(tempdir / "workspace")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
assert not model.is_alive()
def test_is_alive_return_true_when_model_is_setup(tmp_path):
def infer_func(inputs):
return inputs
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
tensor_store = TensorStore(workspace.path / "data_store.sock")
model = Model(
model_name="simple",
model_version=2,
inference_fn=infer_func,
inputs=[
Tensor(name="variable1", dtype=object, shape=(2, 1)),
Tensor(name="variable2", dtype=np.float32, shape=(2, 1)),
],
outputs=[
Tensor(name="factorials", dtype=np.int32, shape=(-1,)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
try:
tensor_store.start()
model.setup()
assert model.is_alive()
assert len(model._inference_handlers) == 1
finally:
model.clean()
tensor_store.close()
def test_triton_context_injection(tmp_path):
class Multimodel:
@batch
def infer1(self, variable1):
return [variable1]
@batch
def infer2(self, variable2):
return [variable2]
m = Multimodel()
@batch
def infer_func(variable3):
return [variable3]
triton_context = TritonContext()
workspace = Workspace(tmp_path / "workspace")
tensor_store = TensorStore(workspace.path / "data_store.sock")
tensor_store.start()
model1 = Model(
model_name="simple1",
model_version=1,
inference_fn=m.infer1,
inputs=[
Tensor(name="variable1", dtype=np.int32, shape=(2, 1)),
],
outputs=[
Tensor(name="out1", dtype=np.int32, shape=(2, 1)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
model2 = Model(
model_name="simple2",
model_version=1,
inference_fn=m.infer2,
inputs=[
Tensor(name="variable2", dtype=np.int32, shape=(2, 1)),
],
outputs=[
Tensor(name="out2", dtype=np.int32, shape=(2, 1)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
model3 = Model(
model_name="simple3",
model_version=1,
inference_fn=infer_func,
inputs=[
Tensor(name="variable3", dtype=np.int32, shape=(2, 1)),
],
outputs=[
Tensor(name="out3", dtype=np.int32, shape=(2, 1)),
],
config=ModelConfig(max_batch_size=128, batching=True),
workspace=workspace,
triton_context=triton_context,
strict=False,
)
manager = ModelManager("")
try:
manager.add_model(model1)
model1.setup()
manager.add_model(model2)
model2.setup()
manager.add_model(model3)
model3.setup()
input_requests1 = [Request({"variable1": np.array([[7, 5], [8, 6]])}, {})]
input_requests2 = [Request({"variable2": np.array([[1, 2], [1, 2], [11, 12]])}, {})]
input_requests3 = [Request({"variable3": np.array([[1, 2]])}, {})]
def assert_inputs_properly_mapped_to_outputs(expected_out_name, outputs, input_request_arr):
assert len(outputs) == 1
assert expected_out_name in outputs[0]
assert outputs[0][expected_out_name].shape == input_request_arr.shape
assert np.array_equal(outputs[0][expected_out_name], input_request_arr)
outputs1 = m.infer1(input_requests1)
assert_inputs_properly_mapped_to_outputs("out1", outputs1, input_requests1[0]["variable1"])
outputs2 = m.infer2(input_requests2)
assert_inputs_properly_mapped_to_outputs("out2", outputs2, input_requests2[0]["variable2"])
outputs3 = infer_func(input_requests3)
assert_inputs_properly_mapped_to_outputs("out3", outputs3, input_requests3[0]["variable3"])
outputs1 = m.infer1(input_requests1)
assert_inputs_properly_mapped_to_outputs("out1", outputs1, input_requests1[0]["variable1"])
outputs3 = infer_func(input_requests3)
assert_inputs_properly_mapped_to_outputs("out3", outputs3, input_requests3[0]["variable3"])
finally:
manager.clean()
tensor_store.close()