Source code for riakcached.client

__all__ = ["RiakClient"]

import json
import Queue
import threading

import urllib3

from riakcached import exceptions


[docs]class RiakClient(object): """ """ __slots__ = [ "_serializers", "_deserializers", "_pool", "_timeout", "bucket", "url", ] def __init__(self, bucket, url="http://127.0.0.1:8098", timeout=2, auto_connect=True): """ """ self.bucket = bucket self.url = url.strip("/") self._timeout = timeout self._serializers = { "application/json": json.dumps, } self._deserializers = { "application/json": json.loads, } if auto_connect: self._connect()
[docs] def add_serializer(self, content_type, serializer): """ """ content_type = content_type.lower() self._serializers[content_type] = serializer
[docs] def add_deserializer(self, content_type, deserializer): """ """ content_type = content_type.lower() self._deserializers[content_type] = deserializer
[docs] def serialize(self, data, content_type): """ """ serializer = self._serializers.get(content_type, str) return serializer(data)
[docs] def deserialize(self, data, content_type): """ """ deserializer = self._deserializers.get(content_type, str) return deserializer(data)
[docs] def close(self): """ """ if self._pool: self._pool.close()
[docs] def get(self, key, counter=False): """ """ url = "%s/buckets/%s/keys/%s" % (self.url, self.bucket, key) if counter: url = "%s/buckets/%s/counters/%s" % (self.url, self.bucket, key) response = self._request(method="GET", url=url) if response.status == 400: raise exceptions.RiakcachedBadRequest(response.data) elif response.status == 503: raise exceptions.RiakcachedServiceUnavailable(response.data) if response.status not in (200, 300, 304): return None return self.deserialize(response.data, response.getheader("content-type"))
[docs] def get_many(self, keys): """ """ def worker(key, results): results.put((key, self.get(key))) args = [[key] for key in keys] results = self._many(worker, args) results = dict((key, value) for key, value in results.iteritems() if value is not None) return results or None
[docs] def set(self, key, value, content_type="text/plain"): """ """ value = self.serialize(value, content_type) response = self._request( method="POST", url="%s/buckets/%s/keys/%s" % (self.url, self.bucket, key), body=value, headers={ "Content-Type": content_type, }, ) if response.status == 400: raise exceptions.RiakcachedBadRequest(response.data) elif response.status == 412: raise exceptions.RiakcachedPreconditionFailed(response.data) return response.status in (200, 201, 204, 300)
[docs] def set_many(self, values): """ """ def worker(key, value, results): results.put((key, self.set(key, value))) args = [list(data) for data in values.items()] return self._many(worker, args)
[docs] def delete(self, key): """ """ response = self._request( method="DELETE", url="%s/buckets/%s/keys/%s" % (self.url, self.bucket, key), ) if response.status == 400: raise exceptions.RiakcachedBadRequest(response.data) return response.status in (204, 404)
[docs] def delete_many(self, keys): """ """ def worker(key, results): results.put((key, self.delete(key))) args = [[key] for key in keys] return self._many(worker, args)
[docs] def stats(self): """ """ response = self._request( method="GET", url="%s/stats" % self.url, ) if response.status == 200: return self.deserialize(response.data, "application/json") return None
[docs] def props(self): """ """ response = self._request( method="GET", url="%s/buckets/%s/props" % (self.url, self.bucket), ) if response.status == 200: return json.loads(response.data) return None
[docs] def set_props(self, props): """ """ response = self._request( method="PUT", url="%s/buckets/%s/props" % (self.url, self.bucket), body=self.serialize(props, "application/json"), headers={ "Content-Type": "application/json", } ) return response.status == 200
[docs] def keys(self): """ """ response = self._request( method="GET", url="%s/buckets/%s/keys?keys=true" % (self.url, self.bucket), ) if response.status == 200: return self.deserialize(response.data, "application/json") return None
[docs] def ping(self): """ """ response = self._request( method="GET", url="%s/ping" % self.url, ) return response.status == 200
[docs] def incr(self, key, value=1): """ """ response = self._request( method="POST", url="%s/buckets/%s/counters/%s" % (self.url, self.bucket, key), body=str(value), ) if response.status == 409: raise exceptions.RiakcachedConflict(response.data) elif response.status == 400: raise exceptions.RiakcachedBadRequest(response.data) return response.status in (200, 201, 204, 300)
def _connect(self): self._pool = urllib3.connection_from_url(self.url) def _request(self, method, url, body=None, headers=None): try: return self._pool.urlopen( method=method, url=url, body=body, headers=headers, timeout=self._timeout, redirect=False, ) except urllib3.exceptions.TimeoutError, e: raise exceptions.RiakcachedTimeout(e.message) except urllib3.exceptions.HTTPError, e: raise exceptions.RiakcachedConnectionError(e.message) def _many(self, target, args_list): workers = [] worker_results = Queue.Queue() for args in args_list: args.append(worker_results) worker = threading.Thread(target=target, args=args) worker.daemon = True worker.start() workers.append(worker) for worker in workers: worker.join() results = {} while not worker_results.empty(): key, value = worker_results.get() results[key] = value return results

Table Of Contents

This Page