Skip to content

Commit

Permalink
[driver] add reflex
Browse files Browse the repository at this point in the history
  • Loading branch information
silveryfu committed Apr 12, 2021
1 parent f7a9425 commit f756a61
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 68 deletions.
2 changes: 1 addition & 1 deletion runtime/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def reconcile(meta, *args, **kwargs):
_, resp, e = util.check_gen_and_patch_spec(g, v, r, n, ns,
spec, gen=gen)
if e is not None:
if e.status == util.GEN_OUTDATED:
if e.status == util.DriverError.GEN_OUTDATED:
# retry s.t. the diff object contains the past changes
# TBD(@kopf) non-zero delay fix
raise kopf.TemporaryError(e, delay=0)
Expand Down
45 changes: 45 additions & 0 deletions runtime/driver/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Event filters"""


def always(*args, **kwargs):
_, _ = args, kwargs
return True


def has_diff(_, diff, path, *args, **kwargs) -> bool:
_, _ = args, kwargs
changed_paths = {(".",): True}
# TBD: add shared diff for all handlers
# TBD: support incremental diff

for op, path_, old, new in diff:
# on create
if old is None and len(path_) == 0:
changed_paths.update(_from_model(new))
else:
changed_paths.update(_from_path_tuple(path_))

if path in changed_paths or len(diff) == 0:
return True
return False


def _from_model(d: dict):
result = dict()
to_visit = [[d.get("spec", {}), []]]
for n, prefix in to_visit:
result[tuple(prefix)] = True
if type(n) is not dict:
continue
for _k, _v in n.items():
to_visit.append([_v, prefix + [_k]])
return result


def _from_path_tuple(p: tuple):
# expand a path tuple to dict of paths
return {
# skip "spec"
p[1:_i + 1]: True
for _i in range(len(p))
}
45 changes: 3 additions & 42 deletions runtime/driver/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import OrderedDict

import util
import filter
from reconcile import rc

"""Filters."""
Expand Down Expand Up @@ -131,46 +132,6 @@ def _attr(fn, path=".", prio=0):
# TBD: join multiple path to allow multiple decorators per handler
_path = tuple(_path)

# TBD: add shared diff for all handlers
def has_diff(_, diff, *args, **kwargs) -> bool:
_, _ = args, kwargs
changed_paths = {(".",): True}
# TBD: add support for incremental diff

for op, path_, old, new in diff:
# on create
if old is None and len(path_) == 0:
changed_paths.update(_from_model(new))
else:
changed_paths.update(_from_path_tuple(path_))
# print("debug:", _path, changed_paths, diff)
if _path in changed_paths or len(diff) == 0:
return True
return False

# TBD(ZH's point): allow user define lambda filter
# maybe @on.cond(Callable)?? Akin to the "push-down"
# filter to database

def _from_model(d: dict):
result = dict()
to_visit = [[d.get("spec", {}), []]]
for n, prefix in to_visit:
result[tuple(prefix)] = True
if type(n) is not dict:
continue
for _k, _v in n.items():
to_visit.append([_v, prefix + [_k]])
return result

def _from_path_tuple(p: tuple):
# expand a path tuple to dict of paths
return {
# skip "spec"
p[1:_i + 1]: True
for _i in range(len(p))
}

sig = inspect.signature(fn)

# allow the handler declaration to omit arguments
Expand All @@ -184,7 +145,7 @@ def _from_path_tuple(p: tuple):
kwarg_filter.update({"subview": p})
args[p] = None

for p in ["proc_view", "pv", "cur", "parent"]:
for p in ["proc_view", "pv", "cur", "parent", "root"]:
if p in sig.parameters:
kwarg_filter.update({"proc_view": p})
args[p] = None
Expand Down Expand Up @@ -267,5 +228,5 @@ def wrapper_fn(subview, proc_view, view,

rc.add(handler=wrapper_fn,
priority=prio,
condition=has_diff,
condition=filter.has_diff,
path=_path)
27 changes: 27 additions & 0 deletions runtime/driver/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Logic and policy processors"""

import pyjq
import time
from view import ModelView


def jq(policy: str):
# preproc macro
_macros = {
"$time": str(time.time()),
# ...
}

for _m, _v in _macros.items():
policy = policy.replace(_m, _v)

def fn(proc_view, *args, **kwargs):
_, _ = args, kwargs
with ModelView(proc_view) as mv:
mv.update(pyjq.one(policy, mv))

return fn


def py(policy: str):
return NotImplemented
145 changes: 124 additions & 21 deletions runtime/driver/reconcile.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import os
import sys
import typing
import traceback

from collections import defaultdict
from collections import OrderedDict

import util
import filter
import processor


class HandlerType:
BUILTIN = 1
REFLEX = 2


class __Reconciler:
Expand All @@ -21,45 +29,136 @@ def __init__(self):
self.g = os.environ["GROUP"]
self.v = os.environ["VERSION"]
self.r = os.environ["PLURAL"]
self.n = os.environ["NAME"]
self.ns = os.environ["NAMESPACE"]
self.skip_gen = -1

# list of handlers keyed by the priority;
# TBD use bisect if no other purpose
self._prio_handler = defaultdict(list)
# handler info (e.g., priority) are used to
# generate the self.handlers upon handler updates;
# each of form (fn, condition, view_path, priority, type)
# keyed by handler's name (fn.__name__)
self._handler_info = OrderedDict()
self._handler_info_updated = True

def run(self, spec, old, diff, *args, **kwargs):
spec = dict(spec)
proc_spec = dict(spec)

self._update_handler_info(spec, diff)
self._compile_handler()

for fn, cond, path, _ in self.handlers:
if cond(proc_spec, diff, *args, **kwargs):
sub_spec = safe_lookup(proc_spec, path)
if cond(proc_spec, diff, path, *args, **kwargs):
# handler edits the spec object
try:
fn(subview=sub_spec, proc_view=proc_spec,
# TBD allow subview to be a forest
fn(subview=safe_lookup(proc_spec, path),
proc_view=proc_spec,
view=spec, old_view=old,
mount=proc_spec.get("mount", {}), obs=proc_spec.get("obs", {}),
back_prop=get_back_prop(diff), diff=diff)
mount=proc_spec.get("mount", {}),
obs=proc_spec.get("obs", {}),
back_prop=get_back_prop(diff),
diff=diff,
)
except Exception as e:
print(f"reconcile error: {e}")
print(traceback.format_exc())
# TBD: expose driver status on model, e.g., obs.reason/or some debug attribute
return proc_spec
# TBD: detect changes and add to diff
# TBD: reason/debug operator

return proc_spec

def add(self, handler: typing.Callable,
condition: typing.Callable,
priority: int,
path: tuple = ()):

# XXX support reflex API and dynamically add handler
# XXX each handler should be registered only once but allow chained conditions
self._prio_handler[priority].append((handler, condition, path, priority))
for p in sorted(self._prio_handler.keys()):
# skip negative priority
if p < 0:
path: tuple = (),
typ=HandlerType.BUILTIN):

n = handler.__name__

if n in self._handler_info:
# TBD perhaps use deterministic name
n = n + util.uuid_str()

self._handler_info[n] = {
"fn": handler,
"condition": condition,
"view_path": path,
"priority": priority,
"type": typ,
}

def _update_handler_info(self, spec, diff):
# check whether there is a reflex change
_changed = not all(len(_path) < 2 or _path[1] != "reflex"
for _, _path, _, _ in diff)

if _changed:
reflexes = util.deep_get(spec, "reflex", {})

# trim reflexes
to_remove = list()
for n, info in self._handler_info.items():
if info["type"] == HandlerType.BUILTIN:
continue
if n not in reflexes:
to_remove.append(n)

for n in to_remove:
self._handler_info.pop(n, {})

# update handlers
for n, r in reflexes.items():
info = self._handler_info.get(n, {
"fn": do_nothing,
"condition": filter.always, # TBD conditioned reflex
"view_path": ".",
"priority": 0,
"type": HandlerType.REFLEX,
})

patch = dict()
if "policy" in r:
patch.update({
"fn": self._new_reflex(r["policy"],
r.get("processor", "py"))
})

if "priority" in r:
patch.update({
"priority": r["priority"]
})

info.update(patch)
self._handler_info[n] = info

self._handler_info_updated = True

@staticmethod
def _new_reflex(logic, proc="py"):
if logic is None:
return do_nothing
if proc == "py":
return processor.py(logic)
if proc == "jq":
return processor.jq(logic)
...

def _compile_handler(self):
if not self._handler_info_updated:
return

self.handlers = list()
for _, hi in self._handler_info.items():
# treat negative priority as disabled
if hi["priority"] < 0:
continue
self.handlers += self._prio_handler[p]
self.handlers.append((hi["fn"], hi["condition"],
hi["view_path"], hi["priority"]))

# sort by priority
self.handlers = sorted(self.handlers, key=lambda x: x[3])
self._handler_info_updated = False


def safe_lookup(d: dict, path: tuple):
Expand All @@ -71,6 +170,10 @@ def safe_lookup(d: dict, path: tuple):
return d


def do_nothing(*args, **kwargs):
_, _ = args, kwargs


def get_back_prop(diff):
bp = list()
for op, path, old, new in diff:
Expand Down
4 changes: 3 additions & 1 deletion runtime/driver/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#kopf==0.28.03
kopf
kubernetes
pyyaml
inflection
pyjq
python-box
Empty file added runtime/driver/test/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions runtime/driver/test/processor_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import time
import yaml
import os
import sys
import pprint as pp

_dir = os.path.dirname(os.path.realpath(__file__))
_parent_dir = os.path.dirname(_dir)
sys.path.insert(0, _parent_dir)

from processor import jq

test_yaml = f"""
control:
brightness:
intent: 0.8
status: 0
mode:
intent: sleep
status: sleep
mount:
mock.digi.dev/v1/lamps:
default/lamp-test:
spec:
control:
power:
intent: "on"
mock.digi.dev/v1/motionsensors:
default/motionsensor-test:
spec:
obs:
last_triggered_time: {time.time()}
reflex:
motion-mode:
policy: 'if $time - ."motionsensor-test".obs.last_triggered_time <= 600
then .root.control.mode.intent = "work" else . end'
priority: 1
"""

if __name__ == '__main__':
v = yaml.load(test_yaml, Loader=yaml.FullLoader)
start = time.time()
jq(v["reflex"]["motion-mode"]["policy"])(v)
print(f"took {time.time() - start}s")
pp.pprint(v)
Loading

0 comments on commit f756a61

Please sign in to comment.