File size: 5,344 Bytes
7931de6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from typing import Union

import numpy as np
import wrapt
from google.protobuf import json_format  # pytype: disable=pyi-error
from tritonclient.grpc import InferenceServerClient as SyncGrpcInferenceServerClient
from tritonclient.grpc import model_config_pb2, service_pb2
from tritonclient.http import InferenceServerClient as SyncHttpInferenceServerClient
from tritonclient.http.aio import InferenceServerClient as AsyncioHttpInferenceServerClient

from pytriton.model_config import DeviceKind
from pytriton.model_config.generator import ModelConfigGenerator
from pytriton.model_config.triton_model_config import TensorSpec, TritonModelConfig

_LOGGER = logging.getLogger(__name__)


ADD_SUB_WITH_BATCHING_MODEL_CONFIG = TritonModelConfig(
    model_name="AddSub",
    model_version=1,
    max_batch_size=16,
    instance_group={DeviceKind.KIND_CPU: 1},
    inputs=[
        TensorSpec(name="a", shape=(-1, 1), dtype=np.float32),
        TensorSpec(name="b", shape=(-1, 1), dtype=np.float32),
    ],
    outputs=[
        TensorSpec(name="add", shape=(-1, 1), dtype=np.float32),
        TensorSpec(name="sub", shape=(-1, 1), dtype=np.float32),
    ],
    backend_parameters={"shared-memory-socket": "dummy/path"},
)

ADD_SUB_WITHOUT_BATCHING_MODEL_CONFIG = TritonModelConfig(
    model_name="AddSub",
    model_version=1,
    batching=False,
    instance_group={DeviceKind.KIND_CPU: 1},
    inputs=[
        TensorSpec(name="a", shape=(1,), dtype=np.float32),
        TensorSpec(name="b", shape=(1,), dtype=np.float32),
    ],
    outputs=[
        TensorSpec(name="add", shape=(1,), dtype=np.float32),
        TensorSpec(name="sub", shape=(1,), dtype=np.float32),
    ],
    backend_parameters={"shared-memory-socket": "dummy/path"},
)

GRPC_LOCALHOST_URL = "grpc://localhost:8001"
HTTP_LOCALHOST_URL_NO_SCHEME = "localhost:8000"
HTTP_LOCALHOST_URL = f"http://{HTTP_LOCALHOST_URL_NO_SCHEME}"

EXPECTED_KWARGS_DEFAULT = {
    "model_name": ADD_SUB_WITH_BATCHING_MODEL_CONFIG.model_name,
    "model_version": "",
    "request_id": "0",
    "parameters": None,
    "headers": None,
}

_TritonClientType = Union[
    AsyncioHttpInferenceServerClient, SyncHttpInferenceServerClient, SyncGrpcInferenceServerClient
]
_HttpTritonClientType = Union[AsyncioHttpInferenceServerClient, SyncHttpInferenceServerClient]
_GrpcTritonClientType = SyncGrpcInferenceServerClient


def patch_client__server_up_and_ready(
    mocker, base_triton_client: _TritonClientType, ready_server: bool = True, live_server: bool = True
):
    mocker.patch.object(base_triton_client, base_triton_client.is_server_ready.__name__).return_value = ready_server
    mocker.patch.object(base_triton_client, base_triton_client.is_server_live.__name__).return_value = live_server


def patch_http_client__model_up_and_ready(
    mocker,
    model_config: TritonModelConfig,
    base_triton_client: _HttpTritonClientType,
    ready: bool = True,
):
    mocker.patch.object(base_triton_client, base_triton_client.is_model_ready.__name__).return_value = ready

    model_config_dict = ModelConfigGenerator(model_config).get_config()
    mock_get_model_config = mocker.patch.object(base_triton_client, base_triton_client.get_model_config.__name__)
    mock_get_model_config.return_value = model_config_dict


def patch_grpc_client__model_up_and_ready(
    mocker,
    model_config: TritonModelConfig,
    base_triton_client: _GrpcTritonClientType,
    ready: bool = True,
):
    def new_is_model_ready(model_name, model_version="", headers=None, parameters=None):
        return (
            ready
            and model_name == model_config.model_name
            and (model_version == "" or model_version == str(model_config.model_version))
        )

    mocker.patch.object(base_triton_client, base_triton_client.is_model_ready.__name__, side_effect=new_is_model_ready)

    model_config_dict = ModelConfigGenerator(model_config).get_config()
    model_config_protobuf = json_format.ParseDict(model_config_dict, model_config_pb2.ModelConfig())
    response = service_pb2.ModelConfigResponse(config=model_config_protobuf)
    response_dict = json.loads(json_format.MessageToJson(response, preserving_proto_field_name=True))
    mock_get_model_config = mocker.patch.object(base_triton_client, base_triton_client.get_model_config.__name__)
    mock_get_model_config.return_value = response_dict


@wrapt.decorator
def patch_server_model_addsub_no_batch_ready(wrapped, _instance, _args, kwargs):
    mocker = kwargs["mocker"]
    patch_client__server_up_and_ready(mocker, SyncGrpcInferenceServerClient)
    patch_grpc_client__model_up_and_ready(mocker, ADD_SUB_WITH_BATCHING_MODEL_CONFIG, SyncGrpcInferenceServerClient)
    return wrapped(mocker)