Data Model

This section documents some of DistKV’s server-internal classes.

This module contains DistKV’s basic data model.

TODO: message chains should be refactored to arrays: much lower overhead.

class distkv.model.Node(name, tick=None, cache=None, create=True)

Represents one DistKV participant.

seen(tick, entry=None, local=False)

An event with this tick is in the entry’s chain.

Parameters:
  • tick – The event affecting the given entry.
  • entry – The entry affected by this event.
  • local – The message was not broadcast, thus do not assume that other nodes saw this.
is_deleted(tick)

Check whether this tick has been marked as deleted.

mark_deleted(tick)

The data for this tick will be deleted.

Parameters:tick – The event that caused the deletion.
clear_deleted(tick)

The data for this tick are definitely gone (deleted).

purge_deleted(r: range_set.RangeSet)

All entries in this rangeset are deleted.

This is a shortcut for calling clear_deleted() on each item.

supersede(tick)

The event with this tick is no longer in the referred entry’s chain. This happens when an entry is updated.

Parameters:tick – The event that once affected the given entry.
report_known(r: range_set.RangeSet, local=False)

Some node said that these entries may have been superseded.

Parameters:
  • range – The RangeSet thus marked.
  • local – The message was not broadcast, thus do not assume that other nodes saw this.
report_missing(r: range_set.RangeSet)

Some node doesn’t know about these ticks.

Remember that: we may need to broadcast either their content, or the fact that these ticks have been superseded.

report_deleted(r: range_set.RangeSet, server)

This range has been reported as deleted.

Parameters:
  • range (RangeSet) – the range that’s gone.
  • add (dict) – store additional vanished items. Nodename -> RangeSet
local_known

Values I have seen – they either exist or I know they’ve been superseded

local_deleted

Values I know to have vanished

local_missing

Values I have not seen, the inverse of local_known()

remote_missing

Values from this node which somebody else has not seen

class distkv.model.NodeSet(encoded=None, cache=None)

Represents a dict (nodename > RangeSet).

class distkv.model.NodeEvent(node: distkv.model.Node, tick: Optional[int] = None, prev: Optional[NodeEvent] = None, check_dup=True)

Represents any event originating at a node.

Parameters:
  • node – The node thus affected
  • tick – Counter, timestamp, whatever
  • prev – The previous event, if any
equals(other)

Check whether these chains are equal. Used for ping comparisons.

The last two items may be missing from either chain.

find(node)

Return the position of a node in this chain. Zero if the first entry matches.

Returns None if not present.

filter(node, server=None)

Return an event chain without the given node.

If the node is not in the chain, the result is not a copy.

attach(prev: Optional[NodeEvent] = None, server=None)

Copy this node, if necessary, and attach a filtered prev chain to it

class distkv.model.UpdateEvent(event: distkv.model.NodeEvent, entry: Entry, new_value, old_value=<class 'distkv.util.NotGiven'>, tock=None)

Represents an event which updates something.

class distkv.model.Entry(name: str, parent: Entry, tock=None)

This class represents one key/value pair

SUBTYPE

alias of Entry

follow_acl(*path, create=True, nulls_ok=False, acl=None, acl_key=None)

Follow this path.

If create is True (default), unknown nodes are silently created. Otherwise they cause a KeyError. If None, assume create=True but only check the ACLs.

If nulls_ok is False (default), None is not allowed as a path element. If 2, it is allowed anywhere; if True, only as the first element.

If acl is not None, then acl_key is the ACL letter to check for. acl must be an ACLFinder created from the root of the ACL in question.

The ACL key ‘W’ is special: it checks ‘c’ if the node is new, else ‘w’.

Returns tuple (node, acl) tuple.

follow(*a, **kw)

As follow_acl(), but only returns the node.

mark_deleted(server)

This entry has been deleted.

Returns:the entry’s chain.
purge_deleted()

Call Node.clear_deleted() on each link in this entry’s chain.

await set_data(event: distkv.model.NodeEvent, data: Any, local: bool = False, server=None, tock=None)

This entry is updated by that event.

Parameters:
  • event – The NodeEvent to base the update on.
  • data (Any) – whatever the node should contains. Use distkv.util.NotGiven to delete.
  • local (bool) – Flag whether the event should be forwarded to watchers.
Returns:

The UpdateEvent that has been generated and applied.

await apply(evt: distkv.model.UpdateEvent, local: bool = False, server=None, root=None)

Apply this :cls`UpdateEvent` to me.

Also, forward to watchers (unless local is set).

await walk(proc, acl=None, max_depth=-1, min_depth=0, _depth=0)

Call coroutine proc on this node and all its children).

If acl (must be an ACLStepper) is given, proc is called with the acl as second argument.

If proc raises StopAsyncIteration, chop this subtree.

serialize(chop_path=0, nchain=2, conv=None)

Serialize this entry for msgpack.

Parameters:
  • chop_path – If <0, do not return the entry’s path. Otherwise, do, but remove the first N entries.
  • nchain – how many change events to include.
await updated(event: distkv.model.UpdateEvent)

Send an event to this node (and all its parents)’s watchers.

class distkv.model.Watcher(root: distkv.model.Entry)

This helper class is used as an async context manager plus async iterator. It reports all updates to an entry (or its children).

If a watcher terminates, sending to its channel has blocked. The receiver needs to take appropriate re-syncing action.

ACLs

ACL checks are performed by ACLFinder. This class collects all relevant ACL entries for any given (sub)path, sorted by depth-first specificty. This basically means that you collect all ACLs that could possibly match a path and sort them; the + and # wildcards get sorted last. Then the system picks the first entry that actually has a value.

This basically means that if you have a path a b c d e f g and ACLs a b # g and a # d e f g, the first ACL will match because b is more specific than #, even though the second ACL is longer and thus could be regarded as being more specific. However, the current rule is more stable when used with complex ACLs and thus more secure.

class distkv.types.ACLFinder(acl, blocked=None)

A NodeFinder which expects ACL strings as elements

Helper methods and classes

This module contains various helper functions and classes.

distkv.util.yprint(data, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, compact=False)

Standard code to write a YAML record.

Parameters:
  • data – The data to write.
  • stream – the file to write to, defaults to stdout.
  • compact – Write single lines if possible. default False.
distkv.util.yformat(data, compact=None)

Return data as a multi-line YAML string.

Parameters:
  • data – The data to write.
  • stream – the file to write to, defaults to stdout.
  • compact – Write single lines if possible. default False.
class distkv.util.NotGiven

Placeholder value for ‘no data’ or ‘deleted’.

distkv.util.combine_dict(*d, cls=<class 'dict'>) → dict

Returns a dict with all keys+values of all dict arguments. The first found value wins.

This recurses if values are dicts.

Parameters:cls (type) – a class to instantiate the result with. Default: dict. Often used: attrdict.
class distkv.util.attrdict

A dictionary which can be accessed via attributes, for convenience

class distkv.util.PathShortener(prefix)

This class shortens path entries so that the initial components that are equal to the last-used path (or the original base) are skipped.

It is illegal to path-shorten messages whose path does not start with the initial prefix.

Example: The sequence

a b a b c d a b c e f a b c e g h a b c i a b j

is shortened to

0 0 c d 1 e f 2 g h 1 i 0 j

where the initial number is the passed-in depth, assuming the PathShortener is initialized with ('a','b').

Usage:

>>> d = _PathShortener(['a','b'])
>>> d({'path': 'a b c d'.split})
{'depth':0, 'path':['c','d']}
>>> d({'path': 'a b c e f'.split})
{'depth':1, 'path':['e','f']}

etc.

Note that the input dict is modified in-place.

class distkv.util.PathLongener(prefix=())

This reverts the operation of a PathShortener. You need to pass the same prefix in.

Calling a PathLongener with a dict without depth or path attributes is a no-op.

class distkv.util.MsgReader(*a, buflen=4096, **kw)

Read a stream of messages (encoded with MsgPack) from a file.

Usage:

async with MsgReader(path="/tmp/msgs.pack") as f:
    async for msg in f:
        process(msg)
Parameters:
  • buflen (int) – The read buffer size. Defaults to 4k.
  • path (str) – the file to write to.
  • stream – the stream to write to.

Exactly one of path and stream must be used.

class distkv.util.MsgWriter(*a, buflen=65536, **kw)

Write a stream of messages to a file (encoded with MsgPack).

Usage:

async with MsgWriter("/tmp/msgs.pack") as f:
    for msg in some_source_of_messages():  # or "async for"
        f(msg)
Parameters:
  • buflen (int) – The buffer size. Defaults to 64k.
  • path (str) – the file to write to.
  • stream – the stream to write to.

Exactly one of path and stream must be used.

The stream is buffered. Call distkv.util.MsgWriter.flush() to flush the buffer.

await flush()

Flush the buffer.

distkv.util.gen_ssl(ctx: Union[bool, ssl.SSLContext, Dict[str, str]] = False, server: bool = True) → Optional[ssl.SSLContext]

Generate a SSL config from the given context.

Parameters:
  • ctx – either a Bool (ssl yes/no) or a dict with “key” and “cert” entries.
  • server – a flag whether to behave as a server.
distkv.util.split_one(p, kw)

Split ‘p’ and add to dict ‘kw’.

distkv.util.make_proc(code, vars, *path, use_async=False)

Compile this code block to a procedure.

Parameters:
  • code – the code block to execute
  • vars – variable names to pass into the code
  • path – the location where the code is stored
Returns:

the procedure to call. All keyval arguments will be in the local dict.

distkv.util.make_module(code, *path)

Compile this code block to something module-ish.

Parameters:
  • code – the code block to execute
  • path – the location where the code is / shall be stored
Returns:

the procedure to call. All keyval arguments will be in the local dict.

class distkv.util.Cache(size, attr='_cache_pos')

A quick-and-dirty cache that keeps the last N entries of anything in memory so that ref and WeakValueDictionary don’t lose them.

Entries get refreshed when they’re in the last third of the cache; as they’re not removed, the actual cache size might only be 2/3rd of SIZE.

resize(size)

Change the size of this cache.

distkv.util.NotGiven

This object marks the absence of information where simply not using the data element or keyword at all would be inconvenient.

For instance, in def fn(value=NotGiven, **kw) you’d need to test 'value'  in kw, or use an exception. The problem is that this would not show up in the function’s signature.

With NotGiven you can simply test value is (or is not) NotGiven.

This module’s job is to run code, resp. to keep it running.

exception distkv.runner.NotSelected

This node has not been selected for a very long time. Something is amiss.

class distkv.runner.RunnerEntry(*a, **k)

An entry representing some hopefully-running code.

The code will run some time after target has passed. On success, it will run again repeat seconds later (if >0). On error, it will run delay seconds later (if >0), multiplied by 2**backoff.

Parameters:
  • code (list) – pointer to the code that’s to be started.
  • data (dict) – additional data for the code.
  • delay (float) – time before restarting the job on error
  • repeat (float) – time before restarting on success
  • target (float) – time the job should be started at

The code runs with these additional keywords:

_entry: this object
_client: the DistKV client connection
_info: a queue to send events to the task. A message of ``None``
    signals that the queue was overflowing and no further messages will
    be delivered.

Messages are defined in distkv.actor.

await run_at(t: float)

Next run at this time.

should_start()

Tell whether this job might want to be started.

Returns:No, it’s running (or has run and doesn’t restart). n>0: wait for n seconds before thinking again. n<0: should have been started n seconds ago, do something!
Return type:False
class distkv.runner.RunnerNode(*a, **k)

Represents all nodes in this runner group.

This is used for load balancing and such. TODO.

class distkv.runner.StateEntry(parent, name=None)

This is the actual state associated with a RunnerEntry. It mmust only be managed by the node that actually runs the code.

Parameters:
  • started (float) – timestamp when the job was last started
  • stopped (float) – timestamp when the job last terminated
  • result (Any) – the code’s return value
  • node (str) – the node running this code
  • backoff (float) – on error, the multiplier to apply to the restart timeout
result

alias of distkv.util.NotGiven

class distkv.runner.StateRoot(client, *path, need_wait=False, cfg=None)

Base class for handling the state of entries.

This is separate from the RunnerRoot hierarchy because the latter may be changed by anybody while this subtree may only be affected by the actual runner. Otherwise we get interesting race conditions.

await kill_stale_nodes(names)

States with node names in the “names” set are stale. Kill them.

class distkv.runner.AnyRunnerRoot(*a, err=None, code=None, **kw)

This class represents the root of a code runner. Its job is to start (and periodically restart, if required) the entry points stored under it.

max_age

Timeout after which we really should have gotten another go

await find_stale_nodes(cur)

Find stale nodes (i.e. last seen < cur) and clean them.

class distkv.runner.RunnerNodeEntry(parent, name=None)

Sub-node so that a SingleRunnerRoot runs only its local nodes.

class distkv.runner.SingleRunnerRoot(*a, node=None, **kw)

This class represents the root of a code runner. Its job is to start (and periodically restart, if required) the entry points stored under it.

While AnyRunnerRoot tries to ensure that the code in question runs on any cluster member, this class runs tasks on a single node. The code is able to check whether any and/or all of the cluster’s main nodes are reachable; this way, the code can default to local operation if connectivity is lost.

Local data (dict):

Parameters:cores (tuple) – list of nodes whose reachability may determine whether the code uses local/emergency/??? mode.

Config file:

Parameters:
  • path (tuple) – the location this entry is stored at. Defaults to ('.distkv', 'process').
  • name (str) – this runner’s name. Defaults to the client’s name plus the name stored in the root node, if any.
  • actor (dict) – the configuration for the underlying actor. See asyncserf.actor for details.
await notify_active()

Notify all running jobs that there’s a change in active status

await run_starting()

Hook to set the local root

max_age

Timeout after which we really should have gotten another ping

This module implements a asyncserf.actor.Actor which works on top of a DistKV client.

class distkv.actor.ClientActor(client, name, prefix=None, cfg=None, tg=None)

This is an Actor which works on top of a DistKV client.

class distkv.actor.ActorState

abstract base class for states

distkv.actor.packer()

Packer.pack(self, obj)

distkv.actor.unpacker()

unpackb(packed, object_hook=None, list_hook=None, bool use_list=True, bool raw=True, bool strict_map_key=False, encoding=None, unicode_errors=None, object_pairs_hook=None, ext_hook=ExtType, Py_ssize_t max_str_len=-1, Py_ssize_t max_bin_len=-1, Py_ssize_t max_array_len=-1, Py_ssize_t max_map_len=-1, Py_ssize_t max_ext_len=-1)

Unpack packed_bytes to object. Returns an unpacked object.

Raises ExtraData when packed contains extra bytes. Raises ValueError when packed is incomplete. Raises FormatError when packed is not valid msgpack. Raises StackError when packed contains too nested. Other exceptions can be raised during unpacking.

See Unpacker for options.

max_xxx_len options are configured automatically from len(packed).