Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,344 Bytes
7931de6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from typing import Union
import numpy as np
import wrapt
from google.protobuf import json_format # pytype: disable=pyi-error
from tritonclient.grpc import InferenceServerClient as SyncGrpcInferenceServerClient
from tritonclient.grpc import model_config_pb2, service_pb2
from tritonclient.http import InferenceServerClient as SyncHttpInferenceServerClient
from tritonclient.http.aio import InferenceServerClient as AsyncioHttpInferenceServerClient
from pytriton.model_config import DeviceKind
from pytriton.model_config.generator import ModelConfigGenerator
from pytriton.model_config.triton_model_config import TensorSpec, TritonModelConfig
_LOGGER = logging.getLogger(__name__)
ADD_SUB_WITH_BATCHING_MODEL_CONFIG = TritonModelConfig(
model_name="AddSub",
model_version=1,
max_batch_size=16,
instance_group={DeviceKind.KIND_CPU: 1},
inputs=[
TensorSpec(name="a", shape=(-1, 1), dtype=np.float32),
TensorSpec(name="b", shape=(-1, 1), dtype=np.float32),
],
outputs=[
TensorSpec(name="add", shape=(-1, 1), dtype=np.float32),
TensorSpec(name="sub", shape=(-1, 1), dtype=np.float32),
],
backend_parameters={"shared-memory-socket": "dummy/path"},
)
ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG = TritonModelConfig(
model_name="AddSub",
model_version=1,
batching=False,
instance_group={DeviceKind.KIND_CPU: 1},
inputs=[
TensorSpec(name="a", shape=(1,), dtype=np.float32),
TensorSpec(name="b", shape=(1,), dtype=np.float32),
],
outputs=[
TensorSpec(name="add", shape=(1,), dtype=np.float32),
TensorSpec(name="sub", shape=(1,), dtype=np.float32),
],
backend_parameters={"shared-memory-socket": "dummy/path"},
)
GRPC_LOCALHOST_URL = "grpc://localhost:8001"
HTTP_LOCALHOST_URL_NO_SCHEME = "localhost:8000"
HTTP_LOCALHOST_URL = f"http://{HTTP_LOCALHOST_URL_NO_SCHEME}"
EXPECTED_KWARGS_DEFAULT = {
"model_name": ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name,
"model_version": "",
"request_id": "0",
"parameters": None,
"headers": None,
}
_TritonClientType = Union[
AsyncioHttpInferenceServerClient, SyncHttpInferenceServerClient, SyncGrpcInferenceServerClient
]
_HttpTritonClientType = Union[AsyncioHttpInferenceServerClient, SyncHttpInferenceServerClient]
_GrpcTritonClientType = SyncGrpcInferenceServerClient
def patch_client__server_up_and_ready(
mocker, base_triton_client: _TritonClientType, ready_server: bool = True, live_server: bool = True
):
mocker.patch.object(base_triton_client, base_triton_client.is_server_ready.__name__).return_value = ready_server
mocker.patch.object(base_triton_client, base_triton_client.is_server_live.__name__).return_value = live_server
def patch_http_client__model_up_and_ready(
mocker,
model_config: TritonModelConfig,
base_triton_client: _HttpTritonClientType,
ready: bool = True,
):
mocker.patch.object(base_triton_client, base_triton_client.is_model_ready.__name__).return_value = ready
model_config_dict = ModelConfigGenerator(model_config).get_config()
mock_get_model_config = mocker.patch.object(base_triton_client, base_triton_client.get_model_config.__name__)
mock_get_model_config.return_value = model_config_dict
def patch_grpc_client__model_up_and_ready(
mocker,
model_config: TritonModelConfig,
base_triton_client: _GrpcTritonClientType,
ready: bool = True,
):
def new_is_model_ready(model_name, model_version="", headers=None, parameters=None):
return (
ready
and model_name == model_config.model_name
and (model_version == "" or model_version == str(model_config.model_version))
)
mocker.patch.object(base_triton_client, base_triton_client.is_model_ready.__name__, side_effect=new_is_model_ready)
model_config_dict = ModelConfigGenerator(model_config).get_config()
model_config_protobuf = json_format.ParseDict(model_config_dict, model_config_pb2.ModelConfig())
response = service_pb2.ModelConfigResponse(config=model_config_protobuf)
response_dict = json.loads(json_format.MessageToJson(response, preserving_proto_field_name=True))
mock_get_model_config = mocker.patch.object(base_triton_client, base_triton_client.get_model_config.__name__)
mock_get_model_config.return_value = response_dict
@wrapt.decorator
def patch_server_model_addsub_no_batch_ready(wrapped, _instance, _args, kwargs):
mocker = kwargs["mocker"]
patch_client__server_up_and_ready(mocker, SyncGrpcInferenceServerClient)
patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG, SyncGrpcInferenceServerClient)
return wrapped(mocker)
|