yerang's picture
Upload 701 files
7931de6 verified
raw
history blame
46.9 kB
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import logging
import threading
import time
import numpy as np
import pytest
import tritonclient.grpc
import tritonclient.http
from pytriton.client import ModelClient
from pytriton.client.exceptions import (
PyTritonClientClosedError,
PyTritonClientInvalidUrlError,
PyTritonClientModelDoesntSupportBatchingError,
PyTritonClientTimeoutError,
PyTritonClientValueError,
)
from pytriton.client.utils import _DEFAULT_NETWORK_TIMEOUT_S
from pytriton.model_config import DeviceKind
from pytriton.model_config.triton_model_config import TensorSpec, TritonModelConfig
from .utils import (
extract_array_from_grpc_infer_input,
extract_array_from_http_infer_input,
patch_grpc_client__model_up_and_ready,
patch_grpc_client__server_up_and_ready,
patch_http_client__model_up_and_ready,
patch_http_client__server_up_and_ready,
verify_equalness_of_dicts_with_ndarray,
wrap_to_grpc_infer_result,
wrap_to_http_infer_result,
)
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("test_sync_client")
ADD_SUB_WITH_BATCHING_MODEL_CONFIG = TritonModelConfig(
model_name="AddSub",
model_version=1,
max_batch_size=16,
instance_group={DeviceKind.KIND_CPU: 1},
inputs=[
TensorSpec(name="a", shape=(-1, 1), dtype=np.float32),
TensorSpec(name="b", shape=(-1, 1), dtype=np.float32),
],
outputs=[
TensorSpec(name="add", shape=(-1, 1), dtype=np.float32),
TensorSpec(name="sub", shape=(-1, 1), dtype=np.float32),
],
backend_parameters={"shared-memory-socket": "dummy/path"},
)
ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG = TritonModelConfig(
model_name="AddSub",
model_version=1,
batching=False,
instance_group={DeviceKind.KIND_CPU: 1},
inputs=[
TensorSpec(name="a", shape=(1,), dtype=np.float32),
TensorSpec(name="b", shape=(1,), dtype=np.float32),
],
outputs=[
TensorSpec(name="add", shape=(1,), dtype=np.float32),
TensorSpec(name="sub", shape=(1,), dtype=np.float32),
],
backend_parameters={"shared-memory-socket": "dummy/path"},
)
_GRPC_LOCALHOST_URL = "grpc://localhost:8001"
_HTTP_LOCALHOST_URL = "http://localhost:8000"
EXPECTED_KWARGS_HTTP_DEFAULT = {
"model_name": ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name,
"model_version": "",
"request_id": "0",
"parameters": None,
"headers": None,
} # Network timeout is passed to __init__ for client and applied to all network requests for HTTP sync client
EXPECTED_KWARGS_GRPC_DEFAULT = {
**dict(EXPECTED_KWARGS_HTTP_DEFAULT.items()),
"client_timeout": 60.0, # Network timeout shall be passed always for GRPC sync client
}
def test_sync_client_not_raise_error_when_valid_url():
ModelClient("localhost", "dummy")
ModelClient("localhost:8000", "dummy")
ModelClient("http://localhost", "dummy")
ModelClient("http://localhost:8000", "dummy")
ModelClient("grpc://localhost", "dummy")
ModelClient("grpc://localhost:8001", "dummy")
def test_sync_client_init_raises_error_when_invalid_url_provided():
with pytest.raises(PyTritonClientInvalidUrlError, match="Invalid url"):
ModelClient(["localhost:8001"], "dummy") # pytype: disable=wrong-arg-types
with pytest.raises(PyTritonClientInvalidUrlError, match="Invalid url"):
ModelClient("https://localhost:8000", "dummy")
with pytest.raises(PyTritonClientInvalidUrlError, match="Invalid url"):
ModelClient("invalid_scheme://localhost", "dummy")
with pytest.raises(PyTritonClientInvalidUrlError, match="Invalid url"):
ModelClient("http://localhost:foo", "dummy")
def test_sync_grpc_client_init_raises_error_when_use_non_lazy_init_on_non_responding_server():
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for (.*) to be ready timed out."):
ModelClient("dummy:43299", "dummy", lazy_init=False, init_timeout_s=1)
def test_sync_grpc_client_init_raises_error_when_requested_unavailable_model_and_non_lazy_init_called(mocker):
from tritonclient.grpc import service_pb2
patch_grpc_client__server_up_and_ready(mocker)
mock_get_repo_index = mocker.patch.object(tritonclient.grpc.InferenceServerClient, "get_model_repository_index")
mock_get_repo_index.return_value = service_pb2.RepositoryIndexResponse(
models=[
service_pb2.RepositoryIndexResponse.ModelIndex(name="OtherName", version="1", state="READY", reason=""),
]
)
mocker.patch.object(tritonclient.grpc.InferenceServerClient, "is_model_ready").return_value = False
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
ModelClient(_GRPC_LOCALHOST_URL, "NotExistentModel", lazy_init=False, init_timeout_s=1.5)
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
ModelClient(_GRPC_LOCALHOST_URL, "OtherName", "2", lazy_init=False, init_timeout_s=1.5)
def test_sync_grpc_client_init_obtain_expected_model_config_when_lazy_init_is_disabled(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
spy_client_init = mocker.spy(tritonclient.grpc.InferenceServerClient, "__init__")
spy_get_model_config = mocker.spy(tritonclient.grpc.InferenceServerClient, "get_model_config")
client = ModelClient("grpc://localhost:8001", ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name, lazy_init=False)
first_call = spy_client_init.mock_calls[0]
assert first_call.args == (client._general_client, "localhost:8001")
assert first_call.kwargs == {}
second_call = spy_client_init.mock_calls[1]
assert second_call.args == (client._infer_client, "localhost:8001")
assert second_call.kwargs == {}
# assert [(call.args, call.kwargs) for call in spy_client_init.mock_calls] == [
# (
# (
# client._general_client,
# "localhost:8001",
# ),
# {},
# ),
# (
# (
# client._infer_client,
# "localhost:8001",
# ),
# {},
# ),
# ]
spy_get_model_config.assert_called_once_with(
ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name,
"",
as_json=True,
# FIXME: GRPC client get_model_config doesn't support client_timeout parameter
# client_timeout=60.0,
)
assert client.model_config == ADD_SUB_WITH_BATCHING_MODEL_CONFIG
def test_sync_grpc_client_model_config_raises_error_when_requested_unavailable_model(mocker):
from tritonclient.grpc import service_pb2
patch_grpc_client__server_up_and_ready(mocker)
mock_get_repo_index = mocker.patch.object(tritonclient.grpc.InferenceServerClient, "get_model_repository_index")
mock_get_repo_index.return_value = service_pb2.RepositoryIndexResponse(
models=[
service_pb2.RepositoryIndexResponse.ModelIndex(name="OtherName", version="1", state="READY", reason=""),
]
)
mocker.patch.object(tritonclient.grpc.InferenceServerClient, "is_model_ready").return_value = False
with ModelClient(_GRPC_LOCALHOST_URL, "NonExistentModel", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.model_config
with ModelClient(_GRPC_LOCALHOST_URL, "OtherName", "2", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.model_config
def test_sync_grpc_client_infer_raises_error_when_requested_unavailable_model(mocker):
from tritonclient.grpc import service_pb2
patch_grpc_client__server_up_and_ready(mocker)
mock_get_repo_index = mocker.patch.object(tritonclient.grpc.InferenceServerClient, "get_model_repository_index")
mock_get_repo_index.return_value = service_pb2.RepositoryIndexResponse(
models=[
service_pb2.RepositoryIndexResponse.ModelIndex(name="OtherName", version="1", state="READY", reason=""),
]
)
mocker.patch.object(tritonclient.grpc.InferenceServerClient, "is_model_ready").return_value = False
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
with ModelClient(_GRPC_LOCALHOST_URL, "NonExistentModel", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_sample(a, b)
with ModelClient(_GRPC_LOCALHOST_URL, "NonExistentModel", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_batch(a, b)
with ModelClient(_GRPC_LOCALHOST_URL, "OtherName", "2", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_sample(a, b)
with ModelClient(_GRPC_LOCALHOST_URL, "OtherName", "2", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_batch(a, b)
def test_sync_grpc_client_infer_sample_returns_expected_result_when_positional_args_are_used(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_grpc_infer_result(ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG, "0", server_result)
result = client.infer_sample(a, b)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_GRPC_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
"model_version": "",
"request_id": "0",
"inputs": {"a": a, "b": b},
"outputs": list(expected_result),
"parameters": None,
"headers": None,
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_grpc_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_grpc_client_infer_sample_returns_expected_result_when_infer_on_model_with_batching(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
# server will return data with additional axis
server_result = {name: data[np.newaxis, ...] for name, data in expected_result.items()}
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_grpc_infer_result(ADD_SUB_WITH_BATCHING_MODEL_CONFIG, "0", server_result)
inputs_dict = {"a": a, "b": b}
result = client.infer_sample(**inputs_dict)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_GRPC_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name,
# expect to send data with additional batch axis
"inputs": {name: data[np.newaxis, ...] for name, data in inputs_dict.items()},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_grpc_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_grpc_client_infer_sample_returns_expected_result_when_named_args_are_used(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = {"add": a + b, "sub": a - b}
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_grpc_infer_result(ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG, "0", server_result)
inputs_dict = {"a": a, "b": b}
result = client.infer_sample(**inputs_dict)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_GRPC_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
"inputs": inputs_dict,
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_grpc_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_grpc_client_infer_batch_returns_expected_result_when_positional_args_are_used(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
a = np.array([[1], [1]], dtype=np.float32)
b = np.array([[1], [1]], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_grpc_infer_result(ADD_SUB_WITH_BATCHING_MODEL_CONFIG, "0", server_result)
result = client.infer_batch(a, b)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_GRPC_DEFAULT)
expected_kwargs.update(
{
"inputs": {"a": a, "b": b},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_grpc_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_grpc_client_infer_batch_returns_expected_result_when_named_args_are_used(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
a = np.array([[1], [1]], dtype=np.float32)
b = np.array([[1], [1]], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_grpc_infer_result(ADD_SUB_WITH_BATCHING_MODEL_CONFIG, "0", server_result)
inputs_dict = {"a": a, "b": b}
result = client.infer_batch(**inputs_dict)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_GRPC_DEFAULT)
expected_kwargs.update(
{
"inputs": inputs_dict,
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_grpc_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_grpc_client_infer_batch_raises_error_when_model_doesnt_support_batching(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(PyTritonClientModelDoesntSupportBatchingError):
client.infer_batch(a=a, b=b)
def test_sync_grpc_client_infer_raises_error_when_mixed_args_convention_used(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(
PyTritonClientValueError,
match="Use either positional either keyword method arguments convention",
):
client.infer_sample(a, b=b)
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(
PyTritonClientValueError,
match="Use either positional either keyword method arguments convention",
):
client.infer_batch(a, b=b)
def test_sync_grpc_client_infer_raises_error_when_no_args_provided(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(PyTritonClientValueError, match="Provide input data"):
client.infer_sample()
with ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(PyTritonClientValueError, match="Provide input data"):
client.infer_batch()
def test_sync_http_client_init_obtain_expected_model_config_when_lazy_init_is_disabled(mocker):
from pytriton.client.client import DEFAULT_INFERENCE_TIMEOUT_S
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
spy_client_init = mocker.spy(tritonclient.http.InferenceServerClient, "__init__")
client = ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name, lazy_init=False)
first_call = spy_client_init.mock_calls[0]
assert first_call.args == (client._general_client, "localhost:8000")
assert first_call.kwargs == {
"connection_timeout": _DEFAULT_NETWORK_TIMEOUT_S,
"network_timeout": _DEFAULT_NETWORK_TIMEOUT_S,
}
second_call = spy_client_init.mock_calls[1]
assert second_call.args == (client._infer_client, "localhost:8000")
assert second_call.kwargs == {
"connection_timeout": DEFAULT_INFERENCE_TIMEOUT_S,
"network_timeout": DEFAULT_INFERENCE_TIMEOUT_S,
}
# assert [(call.args, call.kwargs) for call in spy_client_init.mock_calls] == [
# (
# (client._general_client, "localhost:8000"),
# {"connection_timeout": _DEFAULT_NETWORK_TIMEOUT_S, "network_timeout": _DEFAULT_NETWORK_TIMEOUT_S},
# ),
# (
# (client._infer_client, "localhost:8000"),
# {"connection_timeout": DEFAULT_INFERENCE_TIMEOUT_S, "network_timeout": DEFAULT_INFERENCE_TIMEOUT_S},
# ),
# ]
assert client.model_config == ADD_SUB_WITH_BATCHING_MODEL_CONFIG
def test_sync_http_client_init_raises_error_when_use_non_lazy_init():
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for (.*) to be ready timed out."):
ModelClient("http://dummy:43299", "dummy", lazy_init=False, init_timeout_s=1)
def test_sync_http_client_init_raises_error_when_requested_unavailable_model_and_non_lazy_init_called(mocker):
patch_http_client__server_up_and_ready(mocker)
mock_get_repo_index = mocker.patch.object(tritonclient.http.InferenceServerClient, "get_model_repository_index")
mock_get_repo_index.return_value = [{"name": "OtherName", "version": "1", "state": "READY", "reason": ""}]
mocker.patch.object(tritonclient.http.InferenceServerClient, "is_model_ready").return_value = False
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
ModelClient(_HTTP_LOCALHOST_URL, "NotExistentModel", lazy_init=False, init_timeout_s=1.5)
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
ModelClient(_HTTP_LOCALHOST_URL, "OtherName", "2", lazy_init=False, init_timeout_s=1.5)
def test_sync_http_client_model_config_raises_error_when_requested_unavailable_model(mocker):
patch_http_client__server_up_and_ready(mocker)
mock_get_repo_index = mocker.patch.object(tritonclient.http.InferenceServerClient, "get_model_repository_index")
mock_get_repo_index.return_value = [{"name": "OtherName", "version": "1", "state": "READY", "reason": ""}]
mocker.patch.object(tritonclient.http.InferenceServerClient, "is_model_ready").return_value = False
with ModelClient(_HTTP_LOCALHOST_URL, "NonExistentModel", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.model_config
with ModelClient(_HTTP_LOCALHOST_URL, "OtherName", "2", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.model_config
def test_sync_http_client_infer_raises_error_when_requested_unavailable_model(mocker):
patch_http_client__server_up_and_ready(mocker)
mock_get_repo_index = mocker.patch.object(tritonclient.http.InferenceServerClient, "get_model_repository_index")
mock_get_repo_index.return_value = [{"name": "OtherName", "version": "1", "state": "READY", "reason": ""}]
mocker.patch.object(tritonclient.http.InferenceServerClient, "is_model_ready").return_value = False
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
with ModelClient(_HTTP_LOCALHOST_URL, "NonExistentModel", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_sample(a, b)
with ModelClient(_HTTP_LOCALHOST_URL, "NonExistentModel", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_batch(a, b)
with ModelClient(_HTTP_LOCALHOST_URL, "OtherName", "2", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_sample(a, b)
with ModelClient(_HTTP_LOCALHOST_URL, "OtherName", "2", init_timeout_s=1.5) as client:
with pytest.raises(PyTritonClientTimeoutError, match="Waiting for model (.*) to be ready timed out."):
_ = client.infer_batch(a, b)
def test_sync_http_client_infer_sample_returns_expected_result_when_infer_on_model_with_batching(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
# server will return data with additional axis
server_result = {name: data[np.newaxis, ...] for name, data in expected_result.items()}
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_http_infer_result(ADD_SUB_WITH_BATCHING_MODEL_CONFIG, "0", server_result)
result = client.infer_sample(a, b)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
# expect to send data with additional batch axis
"inputs": {"a": a[np.newaxis, ...], "b": b[np.newaxis, ...]},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_http_client_infer_sample_returns_expected_result_when_positional_args_are_used(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_http_infer_result(ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG, "0", server_result)
result = client.infer_sample(a, b)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
"inputs": {"a": a, "b": b},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
@pytest.fixture(params=["after_infer", "no_infer"])
def infer_state(request):
return request.param
def test_sync_http_client_infer_sample_from_existing_client(mocker, infer_state):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_http_infer_result(ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG, "0", server_result)
if infer_state == "after_infer":
client.infer_sample(a, b)
# After client is created, there should be no call to get_model_config
spy_get_model_config = mocker.spy(tritonclient.http.InferenceServerClient, "get_model_config")
spy_is_server_ready = mocker.spy(tritonclient.http.InferenceServerClient, "is_server_ready")
spy_is_server_live = mocker.spy(tritonclient.http.InferenceServerClient, "is_server_live")
with ModelClient.from_existing_client(client) as client_from_existing:
mock_infer_from_existing = mocker.patch.object(client_from_existing._infer_client, "infer")
mock_infer_from_existing.return_value = mock_infer.return_value
result_from_existing = client_from_existing.infer_sample(a, b)
if infer_state == "after_infer":
spy_get_model_config.not_called()
spy_is_server_ready.not_called()
spy_is_server_live.not_called()
else:
assert len(spy_get_model_config.mock_calls) == 2
assert len(spy_is_server_ready.mock_calls) == 3
assert len(spy_is_server_live.mock_calls) == 3
called_kwargs = mock_infer_from_existing.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
"inputs": {"a": a, "b": b},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result_from_existing)
@pytest.fixture(params=["ensure_model_is_ready=True", "ensure_model_is_ready=False"])
def ensure_model_is_ready(request):
return request.param
def test_sync_http_client_infer_batch_init_from_client(mocker, ensure_model_is_ready):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
ensure_model_is_ready = ensure_model_is_ready == "ensure_model_is_ready=True"
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
# After client is created, there should be no call to get_model_config
spy_get_model_config = mocker.spy(tritonclient.http.InferenceServerClient, "get_model_config")
spy_is_server_ready = mocker.spy(tritonclient.http.InferenceServerClient, "is_server_ready")
spy_is_server_live = mocker.spy(tritonclient.http.InferenceServerClient, "is_server_live")
with ModelClient(
url=_HTTP_LOCALHOST_URL,
model_name=ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
model_config=ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG,
ensure_model_is_ready=ensure_model_is_ready,
) as client_from_existing:
mock_infer_from_existing = mocker.patch.object(client_from_existing._infer_client, "infer")
mock_infer_from_existing.return_value = wrap_to_http_infer_result(
ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG, "0", server_result
)
result_from_existing = client_from_existing.infer_batch(a, b)
if ensure_model_is_ready:
spy_get_model_config.not_called()
assert len(spy_is_server_ready.mock_calls) == 2
assert len(spy_is_server_live.mock_calls) == 2
else:
spy_get_model_config.not_called()
spy_is_server_ready.not_called()
spy_is_server_live.not_called()
called_kwargs = mock_infer_from_existing.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
"inputs": {"a": a, "b": b},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result_from_existing)
def test_sync_http_client_infer_sample_returns_expected_result_when_named_args_are_used(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = {"add": a + b, "sub": a - b}
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_http_infer_result(ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG, "0", server_result)
inputs_dict = {"a": a, "b": b}
result = client.infer_sample(**inputs_dict)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
"model_name": ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name,
"inputs": inputs_dict,
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_http_client_infer_batch_returns_expected_result_when_positional_args_are_used(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
a = np.array([[1], [1]], dtype=np.float32)
b = np.array([[1], [1]], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_http_infer_result(ADD_SUB_WITH_BATCHING_MODEL_CONFIG, "0", server_result)
result = client.infer_batch(a, b)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
"inputs": {"a": a, "b": b},
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_http_client_infer_batch_returns_expected_result_when_named_args_are_used(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
a = np.array([[1], [1]], dtype=np.float32)
b = np.array([[1], [1]], dtype=np.float32)
expected_result = {"add": a + b, "sub": a - b}
server_result = expected_result
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
mock_infer = mocker.patch.object(client._infer_client, "infer")
mock_infer.return_value = wrap_to_http_infer_result(ADD_SUB_WITH_BATCHING_MODEL_CONFIG, "0", server_result)
inputs_dict = {"a": a, "b": b}
result = client.infer_batch(**inputs_dict)
called_kwargs = mock_infer.call_args.kwargs
expected_kwargs = dict(EXPECTED_KWARGS_HTTP_DEFAULT)
expected_kwargs.update(
{
"inputs": inputs_dict,
"outputs": list(expected_result),
}
)
for arg_name, arg_value in expected_kwargs.items():
if arg_name not in ["inputs", "outputs"]: # inputs and outputs requires manual verification
assert called_kwargs.get(arg_name) == arg_value
for key in called_kwargs:
assert key in expected_kwargs
assert [output.name() for output in called_kwargs.get("outputs")] == list(expected_kwargs["outputs"])
inputs_called_arg = {i.name(): extract_array_from_http_infer_input(i) for i in called_kwargs.get("inputs")}
verify_equalness_of_dicts_with_ndarray(inputs_called_arg, expected_kwargs["inputs"])
verify_equalness_of_dicts_with_ndarray(expected_result, result)
def test_sync_http_client_infer_batch_raises_error_when_model_doesnt_support_batching(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(PyTritonClientModelDoesntSupportBatchingError):
client.infer_batch(a, b)
def test_sync_http_client_infer_raises_error_when_mixed_args_convention_used(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
a = np.array([1], dtype=np.float32)
b = np.array([1], dtype=np.float32)
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(
PyTritonClientValueError,
match="Use either positional either keyword method arguments convention",
):
client.infer_sample(a, b=b)
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(
PyTritonClientValueError,
match="Use either positional either keyword method arguments convention",
):
client.infer_batch(a, b=b)
def test_sync_http_client_infer_raises_error_when_no_args_provided(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG)
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(PyTritonClientValueError, match="Provide input data"):
client.infer_sample()
with ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name) as client:
with pytest.raises(PyTritonClientValueError, match="Provide input data"):
client.infer_batch()
@pytest.mark.filterwarnings("error::pytest.PytestUnraisableExceptionWarning")
def test_del_of_http_client_does_not_raise_error():
def _del(client):
del client._general_client
del client._infer_client
def _create_client_and_delete():
client = ModelClient(_HTTP_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name)
client.close()
threading.Thread(target=_del, args=(client,)).start()
_create_client_and_delete()
time.sleep(0.1)
gc.collect()
@pytest.mark.filterwarnings("error::pytest.PytestUnraisableExceptionWarning")
def test_del_of_grpc_client_does_not_raise_error():
def _del(client):
del client._general_client
del client._infer_client
def _create_client_and_delete():
client = ModelClient(_GRPC_LOCALHOST_URL, ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name)
client.close()
threading.Thread(target=_del, args=(client,)).start()
_create_client_and_delete()
time.sleep(0.1)
gc.collect()
@pytest.mark.timeout(1.0)
def test_init_http_passes_timeout():
with ModelClient("http://localhost:6669", "dummy", init_timeout_s=0.2, inference_timeout_s=0.1) as client:
with pytest.raises(PyTritonClientTimeoutError):
client.wait_for_model(timeout_s=0.2)
@pytest.mark.timeout(5)
def test_init_grpc_passes_timeout_5():
with ModelClient("grpc://localhost:6669", "dummy", init_timeout_s=0.2, inference_timeout_s=0.1) as client:
with pytest.raises(PyTritonClientTimeoutError):
client.wait_for_model(timeout_s=0.2)
def test_http_client_raises_error_when_used_after_close(mocker):
patch_http_client__server_up_and_ready(mocker)
patch_http_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
with ModelClient(_HTTP_LOCALHOST_URL, "dummy") as client:
pass
with pytest.raises(PyTritonClientClosedError):
client.wait_for_model(timeout_s=0.2)
a = np.array([1], dtype=np.float32)
with pytest.raises(PyTritonClientClosedError):
client.infer_sample(a=a)
with pytest.raises(PyTritonClientClosedError):
client.infer_batch(a=[a])
def test_grpc_client_raises_error_when_used_after_close(mocker):
patch_grpc_client__server_up_and_ready(mocker)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG)
with ModelClient(_GRPC_LOCALHOST_URL, "dummy") as client:
pass
with pytest.raises(PyTritonClientClosedError):
client.wait_for_model(timeout_s=0.2)
a = np.array([1], dtype=np.float32)
with pytest.raises(PyTritonClientClosedError):
client.infer_sample(a=a)
with pytest.raises(PyTritonClientClosedError):
client.infer_batch(a=[a])