Source code for riakcached.clients

__all__ = ["RiakClient", "ThreadedRiakClient"]

import json
import Queue
import threading

from riakcached import exceptions
from riakcached.pools import Urllib3Pool


[docs]class RiakClient(object): """A Memcache like client to the Riak HTTP Interface """ __slots__ = [ "_serializers", "_deserializers", "base_url", "bucket", "pool", ] def __init__(self, bucket, pool=None): """Constructor for a new :class:`riakcached.clients.RiakClient` Pool - if no pool is provided then a default :class:`riakcached.pools.Urllib3Pool` is used :param bucket: The name of the Riak bucket to use :type bucket: str :param pool: The :class:`riakcached.pools.Pool` to use for requests :type pool: :class:`riakcached.pools.Pool` """ if pool is None: self.pool = Urllib3Pool() else: self.pool = pool self.bucket = bucket self.base_url = self.pool.url.rstrip("/") self._serializers = { "application/json": json.dumps, } self._deserializers = { "application/json": json.loads, }
[docs] def add_serializer(self, content_type, serializer): """Add a content-type serializer to the client The `serializer` function should have the following definition:: def serializer(data): return do_something(data) and should return a `str` Example:: def base64_serializer(data): return base64.b64encode(data) client.add_serializer("application/base64", base64_serializer) :param content_type: the content-type to associate `serializer` with :type content_type: str :param serializer: the serializer function to use with `content_type` :type serializer: function """ content_type = content_type.lower() self._serializers[content_type] = serializer
[docs] def add_deserializer(self, content_type, deserializer): """Add a content-type deserializer to the client The `deserializer` function should have the following definition:: def deserializer(data): return undo_something(data) Example:: def base64_deserializer(data): return base64.b64decode(data) client.add_deserializer("application/base64", base64_deserializer) :param content_type: the content-type to associate `deserializer` with :type content_type: str :param deserializer: the deserializer function to use with `content_type` :type deserializer: function """ content_type = content_type.lower() self._deserializers[content_type] = deserializer
[docs] def serialize(self, data, content_type): """Serialize the provided `data` to `content_type` This method will lookup the registered serializer for the provided Content-Type (defaults to str(data)) and passes `data` through the serializer. :param data: the data to serialize :type data: object :param content_type: the desired Content-Type for the provided `data` :type content_type: str :returns: str - the serialized data """ serializer = self._serializers.get(content_type, str) return serializer(data)
[docs] def deserialize(self, data, content_type): """Deserialize the provided `data` from `content_type` This method will lookup the registered deserializer for the provided Content-Type (defaults to str(data)) and passes `data` through the deserializer. :param data: the data to deserialize :type data: str :param content_type: the Content-Type to deserialize `data` from :type content_type: str :returns: object - whatever the deserializer returns """ deserializer = self._deserializers.get(content_type, str) return deserializer(data)
[docs] def get(self, key, counter=False): """Get the value of the key from the client's `bucket` :param key: the key to get from the bucket :type key: str :param counter: whether or not the `key` is a counter :type counter: bool :returns: object - the deserialized value of `key` :returns: None - if the call was not successful or the key was not found :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` :raises: :class:`riakcached.exceptions.RiakcachedServiceUnavailable` """ url = "%s/buckets/%s/keys/%s" % (self.base_url, self.bucket, key) if counter: url = "%s/buckets/%s/counters/%s" % (self.base_url, self.bucket, key) status, data, headers = self.pool.request(method="GET", url=url) if status == 400: raise exceptions.RiakcachedBadRequest(data) elif status == 503: raise exceptions.RiakcachedServiceUnavailable(data) if status not in (200, 300, 304): return None return self.deserialize(data, headers.get("content-type", "text/plain"))
[docs] def get_many(self, keys): """Get the value of multiple keys at once from the client's `bucket` :param keys: the list of keys to get :type keys: list :returns: dict - the keys are the keys provided and the values are the results from calls to :func:`get`, except keys whose values are `None` are not included in the result :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` :raises: :class:`riakcached.exceptions.RiakcachedServiceUnavailable` """ results = dict((key, self.get(key)) for key in keys) return dict((key, value) for key, value in results.iteritems() if value is not None)
[docs] def set(self, key, value, content_type="text/plain"): """Set the value of a key for the client's `bucket` :param key: the key to set the value for :type key: str :param value: the value to set, this will get serialized for the `content_type` :type value: object :param content_type: the Content-Type for `value` :type content_type: str :returns: bool - True if the call is successful, False otherwise :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` :raises: :class:`riakcached.exceptions.RiakcachedPreconditionFailed` """ value = self.serialize(value, content_type) status, data, _ = self.pool.request( method="POST", url="%s/buckets/%s/keys/%s" % (self.base_url, self.bucket, key), body=value, headers={ "Content-Type": content_type, }, ) if status == 400: raise exceptions.RiakcachedBadRequest(data) elif status == 412: raise exceptions.RiakcachedPreconditionFailed(data) return status in (200, 201, 204, 300)
[docs] def set_many(self, values, content_type="text/plain"): """Set the value of multiple keys at once for the client's `bucket` :param values: the key -> value pairings for the keys to set :type values: dict :param content_type: the Content-Type for all of the values provided :type content_type: str :returns: dict - the keys are the keys provided and the values are True or False from the calls to :func:`set` :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` :raises: :class:`riakcached.exceptions.RiakcachedPreconditionFailed` """ return dict( (key, self.set(key, value, content_type)) for key, value in values.iteritems() )
[docs] def delete(self, key): """Delete the provided key from the client's `bucket` :param key: the key to delete :type key: str :returns: bool - True if the key was removed, False otherwise :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` """ status, data, _ = self.pool.request( method="DELETE", url="%s/buckets/%s/keys/%s" % (self.base_url, self.bucket, key), ) if status == 400: raise exceptions.RiakcachedBadRequest(data) return status in (204, 404)
[docs] def delete_many(self, keys): """Delete multiple keys at once from the client's `bucket` :param keys: list of `str` keys to delete :type keys: list :returns: dict - the keys are the keys provided and the values are True or False from the calls to :func:`delete` :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` """ return dict((key, self.delete(key)) for key in keys)
[docs] def stats(self): """Get the server stats :returns: dict - the stats from the server :returns: None - when the call is not successful """ status, data, _ = self.pool.request( method="GET", url="%s/stats" % self.base_url, ) if status == 200: return self.deserialize(data, "application/json") return None
[docs] def props(self): """Get the properties for the client's `bucket` :returns: dict - the `bucket`'s set properties :returns: None - when the call is not successful """ status, data, _ = self.pool.request( method="GET", url="%s/buckets/%s/props" % (self.base_url, self.bucket), ) if status == 200: return json.loads(data) return None
[docs] def set_props(self, props): """Set the properties for the client's `bucket` :param props: the properties to set :type props: dict :returns: bool - True if it is successful otherwise False """ status, _, _ = self.pool.request( method="PUT", url="%s/buckets/%s/props" % (self.base_url, self.bucket), body=self.serialize(props, "application/json"), headers={ "Content-Type": "application/json", } ) return status == 200
[docs] def keys(self): """Get a list of all keys :returns: list - list of keys on the server :returns: None - when the call is not successful """ status, data, _ = self.pool.request( method="GET", url="%s/buckets/%s/keys?keys=true" % (self.base_url, self.bucket), ) if status == 200: return self.deserialize(data, "application/json") return None
[docs] def ping(self): """Ping the server to ensure it is up :returns: bool - True if it is successful, False otherwise """ status, _, _ = self.pool.request( method="GET", url="%s/ping" % self.base_url, ) return status == 200
[docs] def incr(self, key, value=1): """Increment the counter with the provided key :param key: the counter to increment :type key: str :param value: how much to increment by :type value: int :returns: bool - True/False whether or not it was successful :raises: :class:`riakcached.exceptions.RiakcachedConflict` :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` """ status, data, _ = self.pool.request( method="POST", url="%s/buckets/%s/counters/%s" % (self.base_url, self.bucket, key), body=str(value), ) if status == 409: raise exceptions.RiakcachedConflict(data) elif status == 400: raise exceptions.RiakcachedBadRequest(data) return status in (200, 201, 204, 300)
[docs]class ThreadedRiakClient(RiakClient): """A threaded version of :class:`riakcached.clients.RiakClient` The threaded version uses threads to try to parallelize the {set,get,delete}_many method calls """ def _many(self, target, args_list): workers = [] worker_results = Queue.Queue() for args in args_list: args.append(worker_results) worker = threading.Thread(target=target, args=args) worker.daemon = True worker.start() workers.append(worker) for worker in workers: worker.join() results = {} while not worker_results.empty(): key, value = worker_results.get() results[key] = value return results
[docs] def delete_many(self, keys): """Delete multiple keys at once from the client's `bucket` :param keys: list of `str` keys to delete :type keys: list :returns: dict - the keys are the keys provided and the values are True or False from the calls to :func:`delete` :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` """ def worker(key, results): results.put((key, self.delete(key))) args = [[key] for key in keys] return self._many(worker, args)
[docs] def set_many(self, values): """Set the value of multiple keys at once for the client's `bucket` :param values: the key -> value pairings for the keys to set :type values: dict :param content_type: the Content-Type for all of the values provided :type content_type: str :returns: dict - the keys are the keys provided and the values are True or False from the calls to :func:`set` :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` :raises: :class:`riakcached.exceptions.RiakcachedPreconditionFailed` """ def worker(key, value, results): results.put((key, self.set(key, value))) args = [list(data) for data in values.items()] return self._many(worker, args)
[docs] def get_many(self, keys): """Get the value of multiple keys at once from the client's `bucket` :param keys: the list of keys to get :type keys: list :returns: dict - the keys are the keys provided and the values are the results from calls to :func:`get`, except keys whose values are `None` are not included in the result :raises: :class:`riakcached.exceptions.RiakcachedBadRequest` :raises: :class:`riakcached.exceptions.RiakcachedServiceUnavailable` """ def worker(key, results): results.put((key, self.get(key))) args = [[key] for key in keys] results = self._many(worker, args) results = dict((key, value) for key, value in results.iteritems() if value is not None) return results or None

Table Of Contents

This Page