'''
A small, in-development experiment-logging library designed to take advantage of
YAML, JSON-Schema, HDF5-SWMR, and web-based visualization software.
'''
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import importlib
from inspect import cleandoc
import io
import json
from pathlib import Path
import re
import shutil
from textwrap import indent
from time import sleep
from typing import GenericMeta, List
import bottle
import cbor2
import h5py as h5
import jsonschema
import numpy as np
from ruamel import yaml
from toolz import dissoc, keyfilter, merge, valfilter, valmap
__all__ = [
'Namespace', 'Configurable',
'Scope', 'resolve', 'identify', 'create', 'describe',
'Command', 'Record', 'cli', 'require', 'serve']
################################################################################
# Attribute-access-supporting dictionaries
################################################################################
[docs]class Namespace(dict):
'An `dict` that supports accessing items as attributes'
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
def _namespacify(obj):
if isinstance(obj, dict):
return Namespace(valmap(_namespacify, obj))
elif isinstance(obj, list):
return list(map(_namespacify, obj))
else:
return obj
################################################################################
# Serialization/deserialization
################################################################################
_scopes = []
def _flat_scope():
return merge(_scopes)
[docs]class Scope(dict):
'''
A `dict` context that makes its entries available to `resolve` when active.
If multiple scopes are active, the innermost scope (the one entered
most recently) takes precedence.
'''
def __enter__(self):
_scopes.append(self)
def __exit__(self, *_):
_scopes.remove(self)
[docs]def resolve(sym):
'Search the scope stack and module path for an object.'
for scope in reversed(_scopes):
if sym in scope:
return scope[sym]
try:
mod_name, type_name = sym.split('|')
mod = importlib.import_module(mod_name)
return getattr(mod, type_name)
except:
raise KeyError(
f'"{sym}" is not present in the '
'CommandGraph scope stack.')
[docs]def identify(obj):
'Search the scope stack and module path for an object\'s name.'
for scope in reversed(_scopes):
for sym, val in scope.items():
if val is obj:
return sym
mod_name = obj.__module__
obj_name = obj.__qualname__
return f'{mod_name}|{obj_name}'
[docs]def create(spec):
'''
Instantiate a configurable object from a specification.
A specification is a `dict` with
- a "type" field; the innermost `Scope` symbol corresponding to the
object's type, or "{module_name}|{type_name}", if none exist.
- other fields corresponding to the object's configuration properties.
'''
return resolve(spec['type'])(**dissoc(dict(spec), 'type'))
[docs]def describe(obj):
'''
Generate the specification for a configurable object.
A specification is a `dict` with
- a "type" field; the innermost `Scope` symbol corresponding to the
object's type, or "{module_name}|{type_name}", if none exist.
- other fields corresponding to the object's configuration properties.
'''
return Namespace(type=identify(type(obj)), **getattr(obj, 'conf', {}))
################################################################################
# JSON-Schema generation
################################################################################
def _schema_from_type(t):
if t == bool:
return dict(type='boolean')
elif t == int:
return dict(type='integer')
elif t == float:
return dict(type='number')
elif t == str:
return dict(type='string')
elif type(t) == GenericMeta and t.__base__ == List:
return dict(type='array', items=_schema_from_type(t.__args__[0]))
else:
raise ValueError(f'Type "{t}" can\'t be mapped to a schema.')
def _schema_from_prop_spec(prop_spec):
if not isinstance(prop_spec, tuple):
prop_spec = (prop_spec,)
schema = {}
for e in prop_spec:
if isinstance(e, type):
schema.update(_schema_from_type(e))
elif isinstance(e, list):
schema['default'] = e[0]
elif isinstance(e, str):
schema['description'] = e
elif isinstance(e, dict):
schema.update(e)
return schema
def _conf_schema(type_):
prop_specs = keyfilter(re.compile('!(^__)').match, vars(type_.Conf))
prop_schemas = valmap(_schema_from_prop_spec, prop_specs)
required = [*valfilter(lambda v: 'default' not in v, prop_schemas)]
return dict(type='object', properties=prop_schemas)
def _spec_schema(type_):
schema = _conf_schema(type_)
schema['properties']['type'] = {'const': identify(type_)}
return schema
def _update_refs(schema):
if isinstance(schema, dict) and '$ref' in schema:
prefix = '#/definitions/'
old_sym = schema['$ref'][len(prefix):]
new_sym = identify(resolve(old_sym))
return {'$ref': prefix + new_sym}
elif isinstance(schema, dict):
return valmap(_update_refs, schema)
elif isinstance(schema, list):
return list(map(_update_refs, schema))
else:
return schema
def _command_schema():
return dict(
definitions=_update_refs(valmap(_spec_schema, _flat_scope())),
oneOf=[{'$ref': f'#/definitions/{sym}'}
for sym, val in _flat_scope().items()
if issubclass(val, Command)])
################################################################################
# Configurable objects
################################################################################
class ConfigurableMeta(type):
def __init__(self, *args, **kwargs):
id_ = self.__module__+'|'+self.__qualname__
self.conf_schema = _conf_schema(self)
self.spec_schema = {'$ref': f'#/definitions/{id_}'}
[docs]class Configurable(metaclass=ConfigurableMeta):
'''
An object that can be constructed from JSON-object-like structures.
A JSON-object-like structure is a string-keyed `dict` composed of
arbitrarily nested `bool`, `int`, `float`, `str`, `NoneType`, `list`, and
string-keyed `dict` instances.
An object's configuration should be passed to its constructor as a set of
keyword arguments.
'''
[docs] class Conf:
'''
Override this to specify a configuration schema.
Members of `Conf` are interpreted in the following way:
- The member's name corresponds to the expected property's name.
- A `type` value specify the property's expected type.
- A single-element `list` value specifies the property's default value.
- A `str` value specifies the property's docstring.
- A `dict` value specifies raw JSON-Schema constraints.
- A `tuple` value may specify any combination of the above.
Examples:
.. code-block:: python
class Person(cg.Configurable):
class Conf:
name = str, 'a long-winded pointer'
age = int, [0], 'solar rotation count'
shoe_size = 'European standard as of 2018-08-17'
'''
pass
def __init__(self, **conf):
assert ('type' not in conf), (
'"type" can\'t be used as a key in configurations')
self.conf = _namespacify(conf)
@property
def spec(self):
'''
Return the object's specification.
A specification is a `dict` with
- a "type" field; the innermost `Scope` symbol corresponding to the
object's type, or "{module_name}|{type_name}", if none exist.
- other fields corresponding to the object's configuration properties.
(This is equivalent to ``cg.describe(self)``.)
'''
return describe(self)
################################################################################
# Commands
################################################################################
[docs]class Command(Configurable):
'''
An operation that creates a `Record`.
To specify the output path, define an `output_path` property or define
`output_path` as a class-level format string to be resolved with the
command configuration.
Override `run` to do something useful.
'''
def __init__(self, **conf):
super().__init__(**conf)
if isinstance(getattr(type(self), 'output_path', None), str):
self.output_path = Path(type(self).output_path.format(conf))
self.output = Record(self.output_path)
def __del__(self):
dst = Path(self.output_path)
if self.status == 'running':
(dst/'_cmd-status.yaml').write_text('stopped')
[docs] def run(self):
'Override this, writing the output of the command to `cmd.output`.'
@property
def status(self):
'"running", "done", "stopped", or "unbegun".'
try:
dst = Path(self.output_path)
spec = yaml.safe_load((dst/'_cmd-spec.yaml').read_text())
status = yaml.safe_load((dst/'_cmd-status.yaml').read_text())
return status if spec == describe(self) else 'unbegun'
except FileNotFoundError:
return 'unbegun'
def __call__(self):
'''
Execute the command.
Performs the following operations:
- Ensures that the output directory exist **and clears it**.
- Writes the configuration to `{cmd.output_path}/_cmd-spec.yaml`.
- Writes "running" to `{cmd.output_path}/_cmd-status.yaml`.
- Calls `cmd.run`.
- Writes "done" to `{cmd.output_path}/_cmd-status.yaml`.
'''
dst = Path(self.output_path)
shutil.rmtree(dst, ignore_errors=True)
dst.mkdir(parents=True, exist_ok=True)
spec_dict = dict(describe(self))
# spec_str = yaml.safe_dump(spec_dict, allow_unicode=True)
import json; spec_str = json.dumps(spec_dict)
(dst/'_cmd-spec.yaml').write_text(spec_str)
(dst/'_cmd-status.yaml').write_text('running')
self.run()
(dst/'_cmd-status.yaml').write_text('done')
[docs]def require(cmd):
'''
Ensure that a command has started and block until it is finished.
'''
if isinstance(cmd, str): cmd = create(cmd)
if cmd.status in {'unbegun', 'stopped'}: cmd()
while cmd.status == 'running': sleep(0.01)
return Record(cmd.output_path)
################################################################################
# Command records
################################################################################
class _HDF5Entry:
def __init__(self, path):
self.path = path
self.dset = None
def get(self):
f = h5.File(self.path, 'r', libver='latest', swmr=True)
return f['data'][()]
def put(self, val):
val = np.asarray(val)
f = h5.File(self.path, libver='latest')
f.create_dataset('data', data=val)
def append(self, val):
val = np.asarray(val)
if self.dset is None:
f = h5.File(self.path, libver='latest')
self.dset = f.require_dataset(
name='data', shape=None, maxshape=(None, *val.shape),
dtype=val.dtype, data=np.empty((0, *val.shape), val.dtype),
chunks=(int(np.ceil(2**12 / val.size)), *val.shape))
f.swmr_mode = True
self.dset.resize(self.dset.len() + 1, 0)
self.dset[-1] = val
self.dset.flush()
[docs]class Record:
'''
A record of the execution of a `Command`.
A `Record` is an array-friendly view of a directory. It also supports
reading command metadata (stored in "_cmd-spec.yaml" and "_cmd-status.yaml").
TODO: Document this more.
'''
def __init__(self, path):
self.path = Path(path)
self._cache = {}
def _get_entry(self, key):
if key not in self._cache:
self._cache[key] = _HDF5Entry(self.path/f'{key}.h5')
return self._cache[key]
def _get_record(self, key):
if key not in self._cache:
self._cache[key] = Record(self.path/key)
return self._cache[key]
def _forget(self, key):
self._cache.pop(key, None)
def __len__(self):
return len([p for p in self.path.iterdir()
if not p.name.startswith('_')])
def __contains__(self, key):
path = self.path/key
return path.exists or (
path.suffix == '' and
path.with_suffix('.h5').is_file())
def __iter__(self):
for p in self.path.iterdir():
if not p.name.startswith('_'):
if p.suffix == '.h5': yield p.name[:-3]
else: yield p.name
def __getitem__(self, key):
key = Path(key)
path = self.path/key
if len(key.parts) > 1: # Forward to a subrecord
subrec = self._get_record(key.parts[0])
return subrec['/'.join(key.parts[1:])]
elif path.is_dir(): # Return a subrecord
return self._get_record(key)
elif path.with_suffix('.h5').is_file(): # Return an array
return self._get_entry(key).get()
elif path.is_file(): # Return a file path
return path
else:
raise FileNotFoundError()
def __setitem__(self, key, val):
key = Path(key)
path = self.path/key
assert key.suffix == '', 'Can\'t write to encoded files'
if len(key.parts) > 1: # Forward to a subrecord
subrec = self._get_record(key.parts[0])
subrec['/'.join(key.parts[1:])] = val
elif isinstance(val, (dict, Record)): # Forward to all subrecords
for subkey, subval in val.items():
self[f'{key}/{subkey}'] = subval
else: # Write an array
path.parent.mkdir(parents=True, exist_ok=True)
if path.with_suffix('.h5').exists():
path.with_suffix('.h5').unlink()
self._get_entry(key).put(val)
def __delitem__(self, key):
key = Path(key)
path = self.path/key
if len(key.parts) > 1: # Forward to a subrecord
subrec = self._get_record(key.parts[0])
del subrec['/'.join(key.parts[1:])]
elif path.is_dir(): # Delete a subrecord
rec = self._get_record(key)
for k in rec: del rec[k]
shutil.rmtree(path, True)
elif path.with_extension('.h5').is_file(): # Delete an array
path.with_extension('.h5').unlink()
elif path.is_file(): # Delete a non-array file
path.unlink()
else:
raise FileNotFoundError()
self._forget(key)
if self.path.stat().st_nlink == 0:
self.path.rmdir()
[docs] def append(self, key, val):
key = Path(key)
path = self.path/key
assert key.suffix == '', 'Can\'t write to encoded files'
if len(key.parts) > 1: # Forward to a subrecord
subrec = self._get_record(key.parts[0])
subrec.append('/'.join(key.parts[1:]), val)
elif isinstance(val, (dict, Record)): # Forward to all subrecords
for subkey, subval in val.items():
self.append(f'{key}/{subkey}', subval)
else: # Append to an array
path.parent.mkdir(parents=True, exist_ok=True)
self._get_entry(key).append(val)
@property
def cmd_status(self):
try:
spec = yaml.safe_load((self.path/'_cmd-spec.yaml').read_text())
status = yaml.safe_load((self.path/'_cmd-status.yaml').read_text())
return status if spec == self.cmd_spec else 'unbegun'
except FileNotFoundError:
return 'unbegun'
@property
def cmd_spec(self):
try:
return yaml.safe_load(
(self.path/'_cmd-spec.yaml')
.read_text())
except FileNotFoundError:
return None
[docs] def keys(self):
yield from self
[docs] def values(self):
for k in self.keys():
yield self[k]
[docs] def items(self):
yield from zip(
self.keys(),
self.values())
[docs] def flat_keys(self):
for p in self.path.glob('**'):
p_rel = p.relative_to(self.path)
if not any(part.startswith('_') for part in p_rel.parts):
if p_rel.suffix == '.h5': yield str(p_rel)[:-3]
else: yield str(p_rel)
[docs] def flat_values(self):
for k in self.flat_keys():
yield self[k]
[docs] def flat_items(self):
for k in self.flat_keys():
yield k, self[k]
################################################################################
# Command-line interface
################################################################################
def _doc(obj):
return cleandoc(obj.__doc__ or '')
def _ind_a(text):
return indent(text, ' ', lambda _: True)
def _ind_b(text):
return indent(text, '│ ', lambda _: True)
def _cmd_desc(name, cmd):
json_schema = cmd.conf_schema['properties']
schema_str = yaml.safe_dump(json_schema, allow_unicode=True)
conf_desc = 'conf-schema:\n' + _ind_b(schema_str)
return name + ':\n' + _ind_b(_doc(cmd) + '\n' + conf_desc)
def _cmd_dict_desc():
return 'commands:\n' + _ind_a('\n'.join(
_cmd_desc(name, val)
for name, val in _flat_scope().items()
if isinstance(val, Command)))
[docs]def cli():
'Run a command-line interface derived from the current scope stack.'
parser = ArgumentParser(
formatter_class=RawDescriptionHelpFormatter,
epilog=_cmd_dict_desc())
parser.add_argument('cmd_spec', help=(
'the command specification, in YAML format, or '
'the path to a YAML configuration file'))
args = parser.parse_args()
cmd_spec = yaml.safe_load(args.cmd_spec)
if isinstance(cmd_spec, str):
with open(cmd_spec) as f:
cmd_spec = yaml.safe_load(f)
cmds = valfilter(lambda c: isinstance(c, Command), _flat_scope())
if len(cmds) == 1 and 'type' not in cmd_spec:
cmd_spec['type'] = [*cmds][0]
schema = _command_schema()
jsonschema.validate(cmd_spec, schema)
create(cmd_spec)()
################################################################################
# Web API
################################################################################
_web_dtypes = dict(
bool='uint8',
uint8='uint8',
uint16='uint16',
uint32='uint32',
uint64='uint32',
int8='int8',
int16='int16',
int32='int32',
int64='int32',
float16='float32',
float32='float32',
float64='float64',
float96='float64',
float128='float64')
def _response(obj):
return bottle.HTTPResponse(
headers={'Content-Type': 'application/msgpack',
'Access-Control-Allow-Origin': '*'},
body=io.BytesIO(cbor2.dumps(obj)))
[docs]def serve(rec_path, port=3000):
'''
Start a server providing access to the records in a directory.
'''
root = Record(rec_path)
app = bottle.default_app()
@app.route('/<:re:.*>', method='OPTIONS')
def _(*_):
return bottle.HTTPResponse(headers={
'Allow': 'OPTIONS, GET, HEAD',
'Access-Control-Allow-Headers': '*',
'Access-Control-Allow-Origin': '*'})
@app.get('/_entry-names')
@app.get('/<rec_id:path>/_entry-names')
def _(rec_id=''):
if not (root.path/rec_id).is_dir():
raise bottle.HTTPError(404)
return _response(list(root[rec_id]))
@app.get('/_cmd-info')
@app.get('/<rec_id:path>/_cmd-info')
def _(rec_id=''):
if not (root.path/rec_id).is_dir():
raise bottle.HTTPError(404)
spec = root[rec_id].cmd_spec
status = root[rec_id].cmd_status
return _response(
None if spec is None else
{'type': spec['type'],
'desc': _doc(resolve(spec['type'])),
'conf': {k: v for k, v in spec.items() if k != 'type'},
'status': status})
@app.get('/<ent_id:path>')
def _(ent_id):
if Path(ent_id).suffix != '':
return bottle.static_file(ent_id, root=root.path)
elif ent_id in root:
ent = root[ent_id]
if ent.dtype.kind in ['U', 'S']:
return _response(ent.astype('U').tolist())
else:
ent = ent.astype(_web_dtypes[ent.dtype.name])
return _response({'$type': 'array',
'data': ent.data.tobytes(),
'dtype': ent.dtype.name,
'shape': ent.shape})
else:
raise bottle.HTTPError(404)
app.run(host='localhost', port=port)