Data Model¶
This section documents some of DistKV’s server-internal classes.
This module contains DistKV’s basic data model.
TODO: message chains should be refactored to arrays: much lower overhead.
-
class
distkv.model.
Node
(name, tick=None, cache=None, create=True)¶ Represents one DistKV participant.
-
seen
(tick, entry=None, local=False)¶ An event with this tick is in the entry’s chain.
Parameters: - tick – The event affecting the given entry.
- entry – The entry affected by this event.
- local – The message was not broadcast, thus do not assume that other nodes saw this.
-
is_deleted
(tick)¶ Check whether this tick has been marked as deleted.
-
mark_deleted
(tick)¶ The data for this tick will be deleted.
Parameters: tick – The event that caused the deletion.
-
clear_deleted
(tick)¶ The data for this tick are definitely gone (deleted).
-
purge_deleted
(r: range_set.RangeSet)¶ All entries in this rangeset are deleted.
This is a shortcut for calling
clear_deleted()
on each item.
-
supersede
(tick)¶ The event with this tick is no longer in the referred entry’s chain. This happens when an entry is updated.
Parameters: tick – The event that once affected the given entry.
-
report_known
(r: range_set.RangeSet, local=False)¶ Some node said that these entries may have been superseded.
Parameters: - range – The RangeSet thus marked.
- local – The message was not broadcast, thus do not assume that other nodes saw this.
-
report_missing
(r: range_set.RangeSet)¶ Some node doesn’t know about these ticks.
Remember that: we may need to broadcast either their content, or the fact that these ticks have been superseded.
-
report_deleted
(r: range_set.RangeSet, server)¶ This range has been reported as deleted.
Parameters: - range (RangeSet) – the range that’s gone.
- add (dict) – store additional vanished items. Nodename -> RangeSet
-
local_known
¶ Values I have seen – they either exist or I know they’ve been superseded
-
local_deleted
¶ Values I know to have vanished
-
local_missing
¶ Values I have not seen, the inverse of
local_known()
-
remote_missing
¶ Values from this node which somebody else has not seen
-
-
class
distkv.model.
NodeSet
(encoded=None, cache=None)¶ Represents a dict (nodename > RangeSet).
-
class
distkv.model.
NodeEvent
(node: distkv.model.Node, tick: Optional[int] = None, prev: Optional[NodeEvent] = None, check_dup=True)¶ Represents any event originating at a node.
Parameters: - node – The node thus affected
- tick – Counter, timestamp, whatever
- prev – The previous event, if any
-
equals
(other)¶ Check whether these chains are equal. Used for ping comparisons.
The last two items may be missing from either chain.
-
find
(node)¶ Return the position of a node in this chain. Zero if the first entry matches.
Returns
None
if not present.
-
filter
(node, server=None)¶ Return an event chain without the given node.
If the node is not in the chain, the result is not a copy.
-
attach
(prev: Optional[NodeEvent] = None, server=None)¶ Copy this node, if necessary, and attach a filtered prev chain to it
-
class
distkv.model.
UpdateEvent
(event: distkv.model.NodeEvent, entry: Entry, new_value, old_value=<class 'distkv.util.NotGiven'>, tock=None)¶ Represents an event which updates something.
-
class
distkv.model.
Entry
(name: str, parent: Entry, tock=None)¶ This class represents one key/value pair
-
follow_acl
(*path, create=True, nulls_ok=False, acl=None, acl_key=None)¶ Follow this path.
If
create
is True (default), unknown nodes are silently created. Otherwise they cause a KeyError. IfNone
, assumecreate=True
but only check the ACLs.If
nulls_ok
is False (default), None is not allowed as a path element. If 2, it is allowed anywhere; if True, only as the first element.If
acl
is notNone
, thenacl_key
is the ACL letter to check for.acl
must be anACLFinder
created from the root of the ACL in question.The ACL key ‘W’ is special: it checks ‘c’ if the node is new, else ‘w’.
Returns tuple (node, acl) tuple.
-
follow
(*a, **kw)¶ As
follow_acl()
, but only returns the node.
-
mark_deleted
(server)¶ This entry has been deleted.
Returns: the entry’s chain.
-
purge_deleted
()¶ Call
Node.clear_deleted()
on each link in this entry’s chain.
-
await
set_data
(event: distkv.model.NodeEvent, data: Any, local: bool = False, server=None, tock=None)¶ This entry is updated by that event.
Parameters: - event – The
NodeEvent
to base the update on. - data (Any) – whatever the node should contains. Use
distkv.util.NotGiven
to delete. - local (bool) – Flag whether the event should be forwarded to watchers.
Returns: The
UpdateEvent
that has been generated and applied.- event – The
-
await
apply
(evt: distkv.model.UpdateEvent, local: bool = False, server=None, root=None)¶ Apply this :cls`UpdateEvent` to me.
Also, forward to watchers (unless
local
is set).
-
await
walk
(proc, acl=None, max_depth=-1, min_depth=0, _depth=0)¶ Call coroutine
proc
on this node and all its children).If acl (must be an ACLStepper) is given, proc is called with the acl as second argument.
If proc raises StopAsyncIteration, chop this subtree.
-
serialize
(chop_path=0, nchain=2, conv=None)¶ Serialize this entry for msgpack.
Parameters: - chop_path – If <0, do not return the entry’s path. Otherwise, do, but remove the first N entries.
- nchain – how many change events to include.
-
await
updated
(event: distkv.model.UpdateEvent)¶ Send an event to this node (and all its parents)’s watchers.
-
-
class
distkv.model.
Watcher
(root: distkv.model.Entry)¶ This helper class is used as an async context manager plus async iterator. It reports all updates to an entry (or its children).
If a watcher terminates, sending to its channel has blocked. The receiver needs to take appropriate re-syncing action.
ACLs¶
ACL checks are performed by ACLFinder
. This class
collects all relevant ACL entries for any given (sub)path, sorted by
depth-first specificty. This basically means that you collect all ACLs
that could possibly match a path and sort them; the +
and #
wildcards get sorted last. Then the system picks the first entry that
actually has a value.
This basically means that if you have a path a b c d e f g
and ACLs a
b # g
and a # d e f g
, the first ACL will match because b
is
more specific than #
, even though the second ACL is longer and thus
could be regarded as being more specific. However, the current rule is more
stable when used with complex ACLs and thus more secure.
-
class
distkv.types.
ACLFinder
(acl, blocked=None)¶ A NodeFinder which expects ACL strings as elements
Helper methods and classes¶
This module contains various helper functions and classes.
-
distkv.util.
yprint
(data, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, compact=False)¶ Standard code to write a YAML record.
Parameters: - data – The data to write.
- stream – the file to write to, defaults to stdout.
- compact – Write single lines if possible. default False.
-
distkv.util.
yformat
(data, compact=None)¶ Return
data
as a multi-line YAML string.Parameters: - data – The data to write.
- stream – the file to write to, defaults to stdout.
- compact – Write single lines if possible. default False.
-
class
distkv.util.
NotGiven
¶ Placeholder value for ‘no data’ or ‘deleted’.
-
distkv.util.
combine_dict
(*d, cls=<class 'dict'>) → dict¶ Returns a dict with all keys+values of all dict arguments. The first found value wins.
This recurses if values are dicts.
Parameters: cls (type) – a class to instantiate the result with. Default: dict. Often used: attrdict
.
-
class
distkv.util.
attrdict
¶ A dictionary which can be accessed via attributes, for convenience
-
class
distkv.util.
PathShortener
(prefix)¶ This class shortens path entries so that the initial components that are equal to the last-used path (or the original base) are skipped.
It is illegal to path-shorten messages whose path does not start with the initial prefix.
Example: The sequence
a b a b c d a b c e f a b c e g h a b c i a b jis shortened to
0 0 c d 1 e f 2 g h 1 i 0 jwhere the initial number is the passed-in
depth
, assuming the PathShortener is initialized with('a','b')
.Usage:
>>> d = _PathShortener(['a','b']) >>> d({'path': 'a b c d'.split}) {'depth':0, 'path':['c','d']} >>> d({'path': 'a b c e f'.split}) {'depth':1, 'path':['e','f']}
etc.
Note that the input dict is modified in-place.
-
class
distkv.util.
PathLongener
(prefix=())¶ This reverts the operation of a PathShortener. You need to pass the same prefix in.
Calling a PathLongener with a dict without
depth
orpath
attributes is a no-op.
-
class
distkv.util.
MsgReader
(*a, buflen=4096, **kw)¶ Read a stream of messages (encoded with MsgPack) from a file.
Usage:
async with MsgReader(path="/tmp/msgs.pack") as f: async for msg in f: process(msg)
Parameters: Exactly one of
path
andstream
must be used.
-
class
distkv.util.
MsgWriter
(*a, buflen=65536, **kw)¶ Write a stream of messages to a file (encoded with MsgPack).
Usage:
async with MsgWriter("/tmp/msgs.pack") as f: for msg in some_source_of_messages(): # or "async for" f(msg)
Parameters: Exactly one of
path
andstream
must be used.The stream is buffered. Call
distkv.util.MsgWriter.flush()
to flush the buffer.-
await
flush
()¶ Flush the buffer.
-
await
-
distkv.util.
gen_ssl
(ctx: Union[bool, ssl.SSLContext, Dict[str, str]] = False, server: bool = True) → Optional[ssl.SSLContext]¶ Generate a SSL config from the given context.
Parameters: - ctx – either a Bool (ssl yes/no) or a dict with “key” and “cert” entries.
- server – a flag whether to behave as a server.
-
distkv.util.
split_one
(p, kw)¶ Split ‘p’ and add to dict ‘kw’.
-
distkv.util.
make_proc
(code, vars, *path, use_async=False)¶ Compile this code block to a procedure.
Parameters: - code – the code block to execute
- vars – variable names to pass into the code
- path – the location where the code is stored
Returns: the procedure to call. All keyval arguments will be in the local dict.
-
distkv.util.
make_module
(code, *path)¶ Compile this code block to something module-ish.
Parameters: - code – the code block to execute
- path – the location where the code is / shall be stored
Returns: the procedure to call. All keyval arguments will be in the local dict.
-
class
distkv.util.
Cache
(size, attr='_cache_pos')¶ A quick-and-dirty cache that keeps the last N entries of anything in memory so that ref and WeakValueDictionary don’t lose them.
Entries get refreshed when they’re in the last third of the cache; as they’re not removed, the actual cache size might only be 2/3rd of SIZE.
-
resize
(size)¶ Change the size of this cache.
-
-
distkv.util.
NotGiven
¶ This object marks the absence of information where simply not using the data element or keyword at all would be inconvenient.
For instance, in
def fn(value=NotGiven, **kw)
you’d need to test'value' in kw
, or use an exception. The problem is that this would not show up in the function’s signature.With
NotGiven
you can simply testvalue is
(oris not
)NotGiven
.
This module’s job is to run code, resp. to keep it running.
-
exception
distkv.runner.
NotSelected
¶ This node has not been selected for a very long time. Something is amiss.
-
class
distkv.runner.
RunnerEntry
(*a, **k)¶ An entry representing some hopefully-running code.
The code will run some time after
target
has passed. On success, it will run againrepeat
seconds later (if >0). On error, it will rundelay
seconds later (if >0), multiplied by 2**backoff.Parameters: The code runs with these additional keywords:
_entry: this object _client: the DistKV client connection _info: a queue to send events to the task. A message of ``None`` signals that the queue was overflowing and no further messages will be delivered.
Messages are defined in
distkv.actor
.-
await
run_at
(t: float)¶ Next run at this time.
-
should_start
()¶ Tell whether this job might want to be started.
Returns: No, it’s running (or has run and doesn’t restart). n>0: wait for n seconds before thinking again. n<0: should have been started n seconds ago, do something! Return type: False
-
await
-
class
distkv.runner.
RunnerNode
(*a, **k)¶ Represents all nodes in this runner group.
This is used for load balancing and such. TODO.
-
class
distkv.runner.
StateEntry
(parent, name=None)¶ This is the actual state associated with a RunnerEntry. It mmust only be managed by the node that actually runs the code.
Parameters: -
result
¶ alias of
distkv.util.NotGiven
-
-
class
distkv.runner.
StateRoot
(client, *path, need_wait=False, cfg=None)¶ Base class for handling the state of entries.
This is separate from the RunnerRoot hierarchy because the latter may be changed by anybody while this subtree may only be affected by the actual runner. Otherwise we get interesting race conditions.
-
await
kill_stale_nodes
(names)¶ States with node names in the “names” set are stale. Kill them.
-
await
-
class
distkv.runner.
AnyRunnerRoot
(*a, err=None, code=None, **kw)¶ This class represents the root of a code runner. Its job is to start (and periodically restart, if required) the entry points stored under it.
-
max_age
¶ Timeout after which we really should have gotten another go
-
await
find_stale_nodes
(cur)¶ Find stale nodes (i.e. last seen < cur) and clean them.
-
-
class
distkv.runner.
RunnerNodeEntry
(parent, name=None)¶ Sub-node so that a SingleRunnerRoot runs only its local nodes.
-
class
distkv.runner.
SingleRunnerRoot
(*a, node=None, **kw)¶ This class represents the root of a code runner. Its job is to start (and periodically restart, if required) the entry points stored under it.
While
AnyRunnerRoot
tries to ensure that the code in question runs on any cluster member, this class runs tasks on a single node. The code is able to check whether any and/or all of the cluster’s main nodes are reachable; this way, the code can default to local operation if connectivity is lost.Local data (dict):
Parameters: cores (tuple) – list of nodes whose reachability may determine whether the code uses local/emergency/??? mode. Config file:
Parameters: -
await
notify_active
()¶ Notify all running jobs that there’s a change in active status
-
await
run_starting
()¶ Hook to set the local root
-
max_age
¶ Timeout after which we really should have gotten another ping
-
await
This module implements a asyncserf.actor.Actor
which works on top of
a DistKV client.
-
class
distkv.actor.
ClientActor
(client, name, prefix=None, cfg=None, tg=None)¶ This is an Actor which works on top of a DistKV client.
-
class
distkv.actor.
ActorState
¶ abstract base class for states
-
distkv.actor.
packer
()¶ Packer.pack(self, obj)
-
distkv.actor.
unpacker
()¶ unpackb(packed, object_hook=None, list_hook=None, bool use_list=True, bool raw=True, bool strict_map_key=False, encoding=None, unicode_errors=None, object_pairs_hook=None, ext_hook=ExtType, Py_ssize_t max_str_len=-1, Py_ssize_t max_bin_len=-1, Py_ssize_t max_array_len=-1, Py_ssize_t max_map_len=-1, Py_ssize_t max_ext_len=-1)
Unpack packed_bytes to object. Returns an unpacked object.
Raises
ExtraData
when packed contains extra bytes. RaisesValueError
when packed is incomplete. RaisesFormatError
when packed is not valid msgpack. RaisesStackError
when packed contains too nested. Other exceptions can be raised during unpacking.See
Unpacker
for options.max_xxx_len options are configured automatically from
len(packed)
.