File size: 4,124 Bytes
2abfccb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import functools
import logging
from functools import partial
from collections import defaultdict
from petrel_client.cache.cache import Cache
from petrel_client.common.io_profile import profile
from petrel_client.common import exception
from petrel_client.cache.mc.petrel_pymc import McClient
from petrel_client.common import hash
LOG = logging.getLogger(__name__)
_STATUS_SUCCESS = 'SUCCESS'
_STATUS_NOT_FOUND = 'NOT FOUND'
_MAX_KEY_SIZE = 250
_ITEM_SIZE_RESERVED = 128
_EXCEPTION_MAP = defaultdict(lambda: exception.McClientError, {
'A TIMEOUT OCCURRED': exception.McTimeoutOccur,
'CONNECTION FAILURE': exception.McConnFailed,
'FAILURE': exception.McServerDisable,
'CLIENT ERROR': exception.McServerDisable,
'SERVER ERROR': exception.McServerDisable,
'ERROR was returned by server': exception.McServerDisable,
'SYSTEM ERROR': exception.McServerFailed,
'A KEY LENGTH OF ZERO WAS PROVIDED': exception.McBadKeyProvided,
'A BAD KEY WAS PROVIDED/CHARACTERS OUT OF RANGE': exception.McBadKeyProvided,
'SERVER IS MARKED DEAD': exception.McServerDead,
'ITEM TOO BIG': exception.McObjectSizeExceed,
'SERVER HAS FAILED AND IS DISABLED UNTIL TIMED RETRY': exception.McServerFailed,
})
def wrap_io(fn):
@functools.wraps(fn)
def new_fn(self, key, *args, **kwargs):
if self.mc_key_cb:
key = self.mc_key_cb(key)
self.check_key_size(key)
value, status = fn(self, key, *args, **kwargs)
if status == _STATUS_SUCCESS:
return value
elif status == _STATUS_NOT_FOUND:
raise exception.McObjectNotFoundError(key)
else:
server, _ = self._mc.get_server(key)
raise _EXCEPTION_MAP[status](key, status, server)
return new_fn
class MC(Cache):
def __init__(self, conf, *args, **kwargs):
mc_server_list_path = conf['mc_server_list_path']
mc_client_config_path = conf['mc_client_config_path']
debug_mc = conf.get_boolean('debug_mc')
if debug_mc:
LOG.setLevel(logging.DEBUG)
else:
LOG.setLevel(logging.WARNING)
self.log = LOG
LOG.debug('init MC, server list path: %s, client config path: %s',
mc_server_list_path, mc_client_config_path)
super(MC, self).__init__(*args, conf=conf, **kwargs)
self._mc = McClient.GetInstance(
mc_server_list_path, mc_client_config_path)
self._max_item_size = self._mc.max_item_size() - _MAX_KEY_SIZE - \
_ITEM_SIZE_RESERVED
self._max_key_size = _MAX_KEY_SIZE
mc_key_cb = kwargs.get('mc_key_cb', None) or conf.get('mc_key_cb')
if mc_key_cb == 'identity':
self.mc_key_cb = None
elif isinstance(mc_key_cb, str):
hash_fn = hash.get_hash_fn(mc_key_cb)
self.mc_key_cb = partial(hash.hexdigest, hash_fn=hash_fn)
LOG.debug('mc: using mc_key_cb %s', mc_key_cb)
elif not callable(mc_key_cb):
raise Exception("argument 'mc_key_cb' should be callable.")
else:
self.mc_key_cb = mc_key_cb
LOG.debug('mc: using user defined mc_key_cb')
def check_key_size(self, key):
if isinstance(key, str):
key_len = len(key.encode('utf-8'))
elif isinstance(key, bytes):
key_len = len(key)
else:
raise Exception(
'mc key type is not supported: {}, value: {}'.format(type(key), key))
if key_len > self._max_key_size:
raise exception.McKeySizeExceed(
'size of key must <= {}'.format(self._max_key_size), key)
@profile('get')
@wrap_io
def get(self, key, **kwargs):
return self._mc.get(key)
@profile('put')
@wrap_io
def put(self, key, content, **kwargs):
size = len(content)
if size > self._max_item_size:
raise exception.McObjectSizeExceed(
key, 'size of object must <= {}, actual size: {}'.format(self._max_item_size, size))
status = self._mc.set(key, content)
return size, status
|