# -*- coding: utf-8 -*-
# pytableaux, a multi-logic proof generator.
# Copyright (C) 2014-2023 Doug Owings.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
pytableaux.proof.tableaux
^^^^^^^^^^^^^^^^^^^^^^^^^
"""
from __future__ import annotations
import operator as opr
from abc import abstractmethod
from collections import deque
from collections.abc import Set
from types import MappingProxyType as MapProxy
from typing import (TYPE_CHECKING, Any, Callable, Iterable, Iterator, Mapping,
Optional, Self, Sequence, SupportsIndex, TypeVar, final)
from ..errors import Emsg, ProofTimeoutError, check
from ..lang.collect import Argument
from ..lang.lex import Sentence
from ..logics import registry
from ..tools import (EMPTY_MAP, EMPTY_SET, SeqCover, absindex, dictns,
for_defaults, qset, qsetf, wraps)
from ..tools.events import EventEmitter
from ..tools.hybrids import SequenceSet, qset
from ..tools.linked import linqset
from ..tools.timing import Counter, StopWatch
from . import RuleMeta, TableauMeta
from .common import Branch, Node, Target
if TYPE_CHECKING:
from typing import overload
from ..logics import LogicType
from ..models import BaseModel
from ..tools import TypeInstMap
_F = TypeVar('_F', bound=Callable)
_RT = TypeVar('_RT', bound='Rule')
_T = TypeVar('_T')
_RHT = TypeVar('_RHT', bound=RuleMeta.AbstractHelper)
__all__ = (
'Rule',
'RuleGroup',
'RuleGroups',
'Tableau',
'RulesRoot')
NOARG = object()
NOGET = object()
# ----------------------------------------------
[docs]
class Rule(EventEmitter, metaclass=RuleMeta):
'Base class for a Tableau rule.'
defaults = MapProxy(dict(
is_rank_optim = True,
nolock = False))
legend: tuple[tuple[str, Any], ...]
"The rule class legend."
Helpers: Mapping[type[Rule.Helper], Any] = {}
"Helper classes mapped to their settings."
timer_names: Sequence[str] = qsetf(('search', 'apply'))
"StopWatch names to create in ``timers`` mapping."
name: str
"The rule class name."
ticking: bool = False
"Whether this is a ticking rule."
autoattrs: bool|None = None
"Whether to induce class attributes from the rule name."
branching: int = 0
"The number of additional branches created."
tableau: Tableau
"The tableau instance."
opts: Mapping
"The options."
helpers: TypeInstMap[Rule.Helper]
"Helper instances mapped by class."
timers: Mapping[str, StopWatch]
"StopWatch instances mapped by name."
history: Sequence[Target]
"The targets applied to."
state: Rule.State
"The state bit flag."
__slots__ = ('tableau', 'helpers', 'timers', 'opts', 'history', 'state')
@property
def locked(self) -> bool:
try:
return self.state.LOCKED in self.state
except AttributeError:
return False
@property
def Meta(self) -> type[LogicType.Meta]|None:
try:
return self.tableau.logic.Meta
except AttributeError:
return type(self).Meta
Meta: type[LogicType.Meta]|None
@property
def modal(self) -> bool|None:
return self.Meta and self.Meta.modal
modal: bool|None
"Whether this is a modal rule."
def __init__(self, tableau: Tableau, /, **opts):
self.state = Rule.State(0)
super().__init__(*Rule.Events)
self.tableau = tableau
self.opts = MapProxy(for_defaults(self.defaults, opts))
self.timers = {name: StopWatch() for name in self.timer_names}
self.history = SeqCover(history := deque())
self.on(Rule.Events.AFTER_APPLY, history.append)
self.helpers = {}
# Add one at a time, to support helper dependency checks.
for Helper in self.Helpers:
self.helpers[Helper] = Helper(self)
if not self.opts['nolock']:
tableau.once(Tableau.Events.AFTER_BRANCH_ADD, self.lock)
self.state |= self.state.INIT
def __getitem__(self, key: type[_RHT]) -> _RHT:
return self.helpers[key]
@abstractmethod
def _get_targets(self, branch: Branch, /) -> Iterable[Target]:
"Yield targets that the rule should apply to."
yield from EMPTY_SET
@abstractmethod
def _apply(self, target: Target, /) -> None:
"Apply the rule to a target returned from ``._get_targets()``."
raise NotImplementedError
[docs]
@abstractmethod
def example_nodes(self) -> Iterable[Node]:
"Return example nodes that would trigger the rule."
yield from EMPTY_SET
[docs]
def sentence(self, node: Node, /) -> Optional[Sentence]:
'Get the relevant sentence for the node, or ``None``.'
return node.get('sentence')
# Scoring
def group_score(self, target: Target, /) -> float:
# Called in tableau
return self.score_candidate(target) / max(1, 1 + self.branching)
# Candidate score implementation options ``is_rank_optim``
def score_candidate(self, target: Target, /) -> float:
return 0.0
[docs]
@final
def target(self, branch: Branch, /) -> Target|None:
"Get the rule target if it applies."
with self.timers['search']:
targets = self._get_targets(branch)
if targets:
if not isinstance(targets, Sequence):
targets = deque(targets)
if not targets:
return
self._extend_targets(targets)
return self._select_best_target(targets)
[docs]
@final
def apply(self, target: Target, /) -> None:
"Apply the rule to a target returned from ``.target()``."
with self.timers['apply']:
self.emit(Rule.Events.BEFORE_APPLY, target)
self._apply(target)
self.emit(Rule.Events.AFTER_APPLY, target)
self.tableau.emit(Tableau.Events.AFTER_RULE_APPLY, target)
def lock(self, *_):
if self.locked:
raise Emsg.IllegalState('Already locked')
self.helpers = MapProxy(self.helpers)
self.timers = MapProxy(self.timers)
self.state |= self.state.LOCKED
def _extend_targets(self, targets: Sequence[Target], /):
"""Augment the targets with the following keys:
- `rule`
- `is_rank_optim`
- `candidate_score`
- `total_candidates`
- `min_candidate_score`
- `max_candidate_score`
Args:
targets: The sequence of targets.
"""
is_rank_optim = self.opts['is_rank_optim']
if is_rank_optim:
scores = deque(map(self.score_candidate, targets))
max_score = max(scores)
min_score = min(scores)
else:
scores = None,
max_score = None
min_score = None
for score, target in zip(scores, targets):
target.update(
rule = self,
is_rank_optim = is_rank_optim,
total_candidates = len(targets),
candidate_score = score,
min_candidate_score = min_score,
max_candidate_score = max_score)
def _select_best_target(self, targets: Iterable[Target], /) -> Target:
'Selects the best target. Assumes targets have been extended.'
is_rank_optim = self.opts['is_rank_optim']
for target in targets:
if not is_rank_optim:
return target
if target['candidate_score'] == target['max_candidate_score']:
return target
def __setattr__(self, name, value):
if self.locked and name in __class__.__slots__:
raise Emsg.ReadOnly(self, name)
super().__setattr__(name, value)
__iter__ = None
__delattr__ = Emsg.ReadOnly.razr
def __repr__(self):
return (f'<{type(self).__name__} module:{self.__module__} '
f'applied:{len(self.history)}>')
[docs]
@classmethod
def test(cls, /, *, noassert = False):
"""Run a simple test on the rule."""
tab = Tableau()
tab.rules.append(cls)
rule = tab.rules.get(cls)
nodes = deque(rule.example_nodes())
branch = tab.branch()
branch.extend(nodes)
entry = tab.step()
tab.finish()
if not noassert:
assert len(rule.history) > 0
return dictns(
cls = cls,
rule = rule,
tableau = tab,
branch = branch,
nodes = nodes,
entry = entry)
# ----------------------------------------------
def locking(method: _F) -> _F:
'Decorator for locking RulesRoot methods after Tableau is started.'
@wraps(method)
def wrapper(self: RulesRoot, *args, **kw):
try:
if self.root.locked:
raise Emsg.IllegalState('locked')
except AttributeError:
pass
return method(self, *args, **kw)
return wrapper
[docs]
class RuleGroup(Sequence[Rule]):
"""A rule group of a Tableau's ``rules``.
This class supports the full ``Sequence`` standard interface for iterating,
subscripting, and slicing.
The :attr:`append`, :attr:`extend`, and :attr:`clear` methods provide
mutability until the instance is locked. An input value is a subclass of
:class:`Rule`, which is instantiated for the tableau before it is added to
the sequence.
Rule instances are indexed, and can be retrieved by its class or name
using the :attr:`get` method.
"""
__slots__ = ('_map', '_seq', 'name', 'root')
name: str|None
"The group name."
root: RulesRoot
_map: dict[str, Rule]
_seq: list[Rule]
def __init__(self, name: str|None, root:RulesRoot, /):
self.name = name
self.root = root
self._seq = []
self._map = {}
[docs]
@locking
def append(self, rulecls: type[Rule], /):
"""Instantiate and append a rule class.
Args:
rulecls: A :class:`Rule` class.
Raises:
ValueError: If there is a duplicate name.
TypeError: If `value` is not a subclass of :class:`Rule`.
errors.IllegalStateError: If locked.
"""
rulecls = check.subcls(rulecls, Rule)
name = rulecls.name
root = self.root
tab = root.tableau
root._checkname(name)
rule = rulecls(tab, **tab.opts)
self._seq.append(rule)
self._map[name] = rule
root._map[name] = rule
[docs]
def extend(self, classes: Iterable[type[Rule]], /):
"""Append multiple rules.
Args:
classes: An iterable of :class:`Rule` classes.
Raises:
ValueError: If there is a duplicate name.
TypeError: If an element is not a subclass of :class:`Rule`.
errors.IllegalStateError: If locked.
"""
for _ in map(self.append, classes): pass
[docs]
@locking
def clear(self):
"""Clear the rule group.
Raises:
errors.IllegalStateError: If locked.
"""
if not isinstance(self._seq, list):
raise Emsg.IllegalState('locked')
self._seq.clear()
self._map.clear()
if TYPE_CHECKING:
@overload
def get(self, ref:str, default=...) -> Rule:...
@overload
def get(self, ref:type[_RT]) -> _RT:...
[docs]
def get(self, ref: type[_RT]|str, default = NOARG, /) -> _RT:
"""Get rule instance by name or type.
Args:
ref: A :class:`Rule` class or name.
default: A value to return if rule not found.
Returns:
The rule instance, or ``default`` if it is specified and the rule was
not found.
Raises:
KeyError: If rule not found and ``default`` not passed.
TypeError: For bad ``ref`` type.
"""
if not isinstance(ref, str):
ref = check.subcls(ref, Rule).name
try:
return self._map[ref]
except KeyError:
if default is NOARG:
raise
return default
[docs]
def names(self):
'Return all the rule names.'
return self._map.keys()
@locking
def lock(self):
if isinstance(self._map, MapProxy):
raise Emsg.IllegalState('locked')
self._seq = SeqCover(self._seq)
self._map = MapProxy(self._map)
def __getitem__(self, index: SupportsIndex|slice):
return self._seq[index]
def __len__(self):
return len(self._seq)
def __contains__(self, ref):
if isinstance(ref, type): # class
ref = check.subcls(ref, Rule).name
if isinstance(ref, str): # name
return ref in self._map
if isinstance(ref, Rule): # instance
return self._map.get(ref.name, None) is ref
raise Emsg.InstCheck(ref, (str, Rule, type))
__delattr__ = Emsg.ReadOnly.razr
__setattr__ = locking(object.__setattr__)
def __repr__(self):
return f'<{type(self).__name__} name:{self.name} rules:{len(self)}>'
[docs]
class RuleGroups(Sequence[RuleGroup]):
__slots__ = ('root', '_seq', '_map')
_seq: list[RuleGroup]
_map: dict[str, RuleGroup]
root: RulesRoot
def __init__(self, root, /):
self.root = root
self._seq = []
self._map = {}
[docs]
@locking
def create(self, name: str|None = None):
'Create and return a new emtpy rule group.'
if name is not None:
self.root._checkname(name)
group = RuleGroup(name, self.root)
self._seq.append(group)
if name is not None:
self._map[name] = group
return group
[docs]
@locking
def append(self, classes: Iterable[type[Rule]], /, name: str|None = NOARG):
'Create a new group with the given rules. Raise IllegalStateError if locked.'
if name is NOARG:
name = None
self.create(name).extend(classes)
[docs]
def extend(self, groups: Iterable[Iterable[type[Rule]]]):
'Add multiple groups. Raise IllegalStateError if locked.'
for _ in map(self.append, groups): pass
[docs]
@locking
def clear(self):
'Clear the groups. Raise IllegalStateError if locked.'
for _ in map(RuleGroup.clear, self): pass
self._seq.clear()
self._map.clear()
[docs]
def get(self, name: str, default = NOARG, /) -> RuleGroup:
'Get a group by name.'
try:
return self._map[name]
except KeyError:
if default is NOARG:
raise
return default
[docs]
def names(self) -> Set[str]:
'List the named groups.'
return self._map.keys()
@locking
def lock(self):
for _ in map(RuleGroup.lock, self): pass
if not isinstance(self._seq, list):
raise Emsg.IllegalState('locked')
self._seq = SeqCover(self._seq)
def __contains__(self, item):
if isinstance(item, str): # group name
return item in self._map
return item in self._seq # group instance
if TYPE_CHECKING:
@overload
def __getitem__(self, index: SupportsIndex) -> RuleGroup: ...
@overload
def __getitem__(self, index: slice) -> list[RuleGroup]: ...
__len__ = RuleGroup.__len__
__getitem__ = RuleGroup.__getitem__
__delattr__ = Emsg.ReadOnly.razr
__setattr__ = locking(object.__setattr__)
def __repr__(self):
try:
logic = self.root.tableau.logic.Meta.name
except AttributeError:
logic = None
return (f'<{type(self).__name__} logic:{logic} groups:{len(self)} '
f'names:{list(self.names())} rules:{sum(map(len, self))}>')
[docs]
class RulesRoot(Sequence[Rule]):
'Grouped and named collection of rules for a tableau.'
__slots__ = ('_map', 'groups', 'locked', 'root', 'tableau')
groups: RuleGroups
"The rule groups sequence view."
locked: bool
root: RulesRoot
tableau: Tableau
_map: dict[str, Rule]
def __init__(self, tableau: Tableau, /):
self._map = {}
self.locked = False
self.root = self
self.tableau = tableau
self.groups = RuleGroups(self)
tableau.once(Tableau.Events.AFTER_BRANCH_ADD, self.lock)
[docs]
def append(self, rulecls: type[Rule], /, name: str|None = None):
'Add a single Rule to a new group.'
self.groups.create(name).append(rulecls)
[docs]
def extend(self, classes: Iterable[type[Rule]], /, name: str|None = None):
'Create a new group from a collection of Rule classes.'
self.groups.create(name).extend(classes)
[docs]
def clear(self):
'Clear all the rules. Raises IllegalStateError if tableau is started.'
self.groups.clear()
self._map.clear()
get = RuleGroup.get
names = RuleGroup.names
@locking
def lock(self, _ = None):
self.tableau.off(Tableau.Events.AFTER_BRANCH_ADD, self.lock)
self.groups.lock()
self._map = MapProxy(self._map)
self.locked = True
def __len__(self):
return len(self._map)
__contains__ = RuleGroup.__contains__
def __iter__(self) -> Iterator[Rule]:
for group in self.groups:
yield from group
def __reversed__(self) -> Iterator[Rule]:
for group in reversed(self.groups):
yield from reversed(group)
def __getitem__(self, index: SupportsIndex) -> Rule:
length = len(self)
index = absindex(length, index)
if 2 * index > length:
i, iterfunc, adjust, compare = length, reversed, opr.sub, opr.le
else:
i, iterfunc, adjust, compare = 0, iter, opr.add, opr.gt
for group in iterfunc(self.groups):
inext = adjust(i, len(group))
if compare(inext, index):
return group[index - i]
i = inext
# should never run.
raise TypeError # pragma: no cover
def __repr__(self):
logic = self.tableau.logic
lname = logic.Meta.name if logic else None
return (f'<{type(self).__name__} logic:{lname} '
f'groups:{len(self.groups)} rules:{len(self)}>')
__delattr__ = Emsg.ReadOnly.razr
__setattr__ = locking(object.__setattr__)
def _checkname(self, name: str, /):
'Validate a new rule or group name before it is added.'
check.inst(name, str)
if name in self.groups or name in self._map:
raise Emsg.DuplicateKey(name)
# ----------------------------------------------
[docs]
class Tableau(Sequence[Branch], EventEmitter, metaclass=TableauMeta):
'A tableau proof.'
rules: RulesRoot
"The rule instances."
opts: Mapping
"The build options."
open: SequenceSet[Branch]
"Ordered view of the open branches."
history: Sequence[Tableau.StepEntry]
"The history of rule applications."
tree: Tableau.Tree
"""A tree structure of the tableau. This is generated after the tableau
is finished. If the `build_timeout` was exceeded, the tree is `not`
built."""
stats: dict
"The stats, built after finished."
models: frozenset[BaseModel]
"""The models, built after finished if the tableau is `invalid` and the
`is_build_models` option is enabled."""
timers: Tableau.Timers
"The tableau timers."
flag: Tableau.Flag
"The :class:`Tableau.Flag` value."
defaults = MapProxy(dict(
auto_build_trunk = True,
is_group_optim = True,
is_build_models = False,
build_timeout = None,
max_steps = None))
__slots__ = (
'_argument',
'_complexities',
'_logic',
'flag',
'history',
'models',
'open',
'opts',
'rules',
'stat',
'stats',
'timers',
'tree',
'__contains__',
'__getitem__',
'__len__')
def __init__(self, logic=None, argument=None, **opts):
"""
Args:
logic: The logic name or module.
argument: The argument for the tableau.
**opts: The build options.
"""
EventEmitter.__init__(self)
self.flag = Tableau.Flag.PREMATURE
self.models = EMPTY_SET
self.stats = EMPTY_MAP
self.tree = None
self.__listen_on(
history := [],
stat := self.Stat(),
opens := linqset(),
branches := [])
self.__len__ = branches.__len__
self.__getitem__ = branches.__getitem__
self.__contains__ = stat.__contains__
self.stat = stat.query
self.history = SeqCover(history)
self.opts = self.defaults | opts
self.timers = Tableau.Timers.create()
self.rules = RulesRoot(self)
self.open = SeqCover(opens)
self._complexities: dict[Node, int] = {}
maxsteps = self.opts['max_steps']
if maxsteps is not None and maxsteps > 0:
self.flag |= self.flag.HAS_STEP_LIMIT
timeout = self.opts['build_timeout']
if timeout is not None and timeout > 0:
self.flag |= self.flag.HAS_TIME_LIMIT
if logic is not None:
self.logic = logic
if argument is not None:
self.argument = argument
@property
def id(self) -> int:
"The unique object ID of the tableau."
return id(self)
@property
def argument(self) -> Argument|None:
"""The argument of the tableau.
When setting this value, if the tableau has a logic set, and the
`auto_build_trunk` option is true (default), then the trunk is
automatically built.
"""
try:
return self._argument
except AttributeError:
pass
@property
def logic(self) -> LogicType|None:
"""The logic of the tableau.
When setting this value, if the tableau has an argument set, and the
`auto_build_trunk` option is true (default), then the trunk is
automatically built.
"""
try:
return self._logic
except AttributeError:
pass
@argument.setter
def argument(self, value):
if self.flag.STARTED in self.flag:
raise Emsg.IllegalState("Tableau already started")
self._argument = Argument(value)
if self.logic is not None and self.opts['auto_build_trunk']:
self.build_trunk()
@logic.setter
def logic(self, value):
if self.flag.STARTED in self.flag:
raise Emsg.IllegalState("Tableau already started")
self.rules.clear()
self._logic = registry(value)
Rules = self.logic.Rules
self.rules.groups.create('closure').extend(Rules.closure)
for group in Rules.groups:
self.rules.groups.create().extend(group)
if self.argument is not None and self.opts['auto_build_trunk']:
self.build_trunk()
@property
def finished(self) -> bool:
"""Whether the tableau is finished. A tableau is `finished` iff `any` of the
following conditions apply:
* The tableau is `completed`.
* The `max_steps` option is met or exceeded.
* The `build_timeout` option is exceeded.
* The :attr:`finish` method is manually invoked.
"""
return self.flag.FINISHED in self.flag
@property
def completed(self) -> bool:
"""Whether the tableau is completed. A tableau is `completed` iff all rules
that can be applied have been applied."""
return self.flag.FINISHED in self.flag and self.flag.PREMATURE not in self.flag
@property
def premature(self) -> bool:
"""Whether the tableau is finished prematurely. A tableau is `premature` iff
it is `finished` but not `completed`."""
return self.flag.FINISHED in self.flag and self.flag.PREMATURE in self.flag
@property
def valid(self) -> bool|None:
"""Whether the tableau's argument is valid (proved). A tableau with an
argument is `valid` iff it is :attr:`completed` and it has no open branches.
If the tableau is not completed, or it has no argument, the value will
be None."""
if self.completed and self.argument is not None:
return len(self.open) == 0
@property
def invalid(self) -> bool|None:
"""Whether the tableau's argument is invalid (disproved). A tableau with
an argument is `invalid` iff it is :attr:`completed` and it has at least one
open branch. If the tableau is not completed, or it has no argument,
the value will be None."""
if self.completed and self.argument is not None:
return len(self.open) > 0
@property
def current_step(self) -> int:
"""The current step number. This is the number of rule applications, plus 1
if the argument trunk is built."""
return len(self.history) + (self.flag.TRUNK_BUILT in self.flag)
if TYPE_CHECKING:
@overload
def stat(self, branch: Branch, *keys): ...
[docs]
def build(self):
'Build the tableau. Returns self.'
for _ in self.stepiter(): pass
return self
def next(self) -> Tableau.StepEntry|None:
"""Choose the next rule step to perform. Returns the StepEntry or ``None``
if no rule can be applied.
This iterates over the open branches, then over rule groups.
"""
for branch in self.open:
for group in self.rules.groups:
res = self._get_group_application(branch, group)
if res:
return res
[docs]
def step(self) -> Tableau.StepEntry|None:
"""Find, execute, and return the next rule application. If no rule can
be applied, the ``finish()`` method is called, and ``None`` is returned.
If the tableau is already finished then this is a no-op and``None`` is
returned.
.. Internally, this calls the ``next()`` method to select the
.. next step, and, if non-empty, applies the rule and appends the entry
.. to the history.
Returns (Tableau.StepEntry):
The history entry, or ``None``.
"""
if self.flag.FINISHED in self.flag:
return
entry = None
self._check_timeout()
with self.timers.build:
with StopWatch() as timer:
if not self._is_max_steps_exceeded():
entry = self.next()
if entry is None:
self.flag &= ~self.flag.PREMATURE
if entry is not None:
entry.rule.apply(entry.target)
entry.duration.inc(timer.elapsed_ms())
else:
self.finish()
return entry
[docs]
def stepiter(self) -> Iterator[Tableau.StepEntry]:
"""Returns an iterator that calls :meth:`step()` until ``None`` is returned."""
while True:
step = self.step()
if not step:
break
yield step
[docs]
def branch(self, /, parent: Branch = None) -> Branch:
"""Create a new branch on the tableau, as a copy of ``parent``, if given.
Args:
parent: The parent branch, if any.
Returns:
The new branch.
"""
if parent is None:
branch = Branch()
else:
branch = parent.copy(parent = parent)
self.add(branch)
return branch
[docs]
def add(self, /, branch: Branch) -> Self:
"""Add a new branch to the tableau. Returns self.
Args:
branch: The branch to add.
Returns:
self
"""
self.emit(next(iter(self.events)), branch)
return self
[docs]
def finish(self) -> Self:
"""Mark the tableau as finished, and perform post-build tasks, including
populating the ``tree``, ``stats``, and ``models`` properties.
When using the ``build()`` or ``step()`` methods, there is never a need
to call this method, since it is handled internally. However, this
method *is* safe to call multiple times. If the tableau is already
finished, it will be a no-op.
Returns:
self
"""
if self.flag.FINISHED in self.flag:
return self
# Mark the flag early to avoid recursion with timeout error handling.
self.flag |= self.flag.FINISHED
timeouterr = None
if self.invalid and self.opts['is_build_models'] and self.logic is not None:
with self.timers.models:
try:
self.models = frozenset(self._gen_models())
except ProofTimeoutError as err:
timeouterr = err
if self.flag.TIMED_OUT not in self.flag:
# In case of a timeout, we do `not` build the tree in order to best
# respect the timeout. In case of `max_steps` excess, however, we
# `do` build the tree.
with self.timers.tree:
self.tree = self.Tree.make(self)
self.stats = self._compute_stats()
self.emit(Tableau.Events.AFTER_FINISH, self)
if timeouterr:
raise timeouterr
return self
[docs]
def build_trunk(self) -> Self:
"""Build the trunk of the tableau. Delegates to the ``build_trunk()``
method of ``System``. This is called automatically when the
tableau has non-empty ``argument`` and ``logic`` properties and the
auto_build_trunk option is True (default).
Raises:
errors.IllegalStateError: if the trunk is already built, the
tableau is already started, there is no argument, or no
logic.
"""
if self.flag.TRUNK_BUILT in self.flag:
raise Emsg.IllegalState('Trunk already built')
if self.argument is None:
raise Emsg.IllegalState('No argument to build trunk')
if self.logic is None:
raise Emsg.IllegalState('No logic to build trunk')
if self.flag.STARTED in self.flag:
raise Emsg.IllegalState("Tableau already started")
with self.timers.trunk:
self.emit(Tableau.Events.BEFORE_TRUNK_BUILD, self)
branch = self.branch()
self.logic.System.build_trunk(branch, self.argument)
self.flag |= self.flag.TRUNK_BUILT | self.flag.STARTED
self.emit(Tableau.Events.AFTER_TRUNK_BUILD, self)
return self
[docs]
def branching_complexity(self, node: Node, /):
"""Caching method for the logic's ``System.branching_complexity()``
method. If the tableau has no logic, then ``0`` is returned.
Args:
node: The node to evaluate.
Returns:
int: The branching complexity.
"""
try:
system = self.logic.System
except AttributeError:
return 0
key = system.branching_complexity_hashable(node)
cache = self._complexities
try:
return cache[key]
except KeyError:
cache[key] = system.branching_complexity(node, self.rules)
return cache[key]
def __bool__(self):
return True
def __repr__(self):
info = dict(
id = self.id,
logic = (self.logic and self.logic.Meta.name),
len = len(self),
open = len(self.open),
step = self.current_step,
finished = self.finished)
if self.finished:
if self.premature:
info['premature'] = True
elif self.valid:
info['valid'] = True
elif self.invalid:
info['invalid'] = True
if self.argument is not None:
info['argument'] = self.argument
istr = ' '.join(f'{k}:{v}' for k, v in info.items())
return f'<{type(self).__name__} {istr}>'
def __listen_on(self, history: list, stat: Tableau.Stat, opens: linqset[Branch], branches: list[Branch]):
if len(self.events): # pragma: no cover
raise Emsg.IllegalState('Listeners already initialized')
add_event = object()
self.events.create(add_event, *Tableau.Events)
def after_close(branch: Branch):
bstat = stat[branch]
bstat[Tableau.StatKey.STEP_CLOSED] = self.current_step
bstat[Tableau.StatKey.FLAGS] |= self.flag.CLOSED
opens.remove(branch)
self.emit(Tableau.Events.AFTER_BRANCH_CLOSE, branch)
def after_node_add(node: Node, branch: Branch):
bstat = stat[branch].node(node)
bstat[Tableau.StatKey.STEP_ADDED] = node.step = self.current_step
self.emit(Tableau.Events.AFTER_NODE_ADD, node, branch)
def after_tick(node: Node, branch: Branch):
bstat = stat[branch].node(node)
bstat[Tableau.StatKey.STEP_TICKED] = self.current_step
bstat[Tableau.StatKey.FLAGS] |= self.flag.TICKED
self.emit(Tableau.Events.AFTER_NODE_TICK, node, branch)
branch_listeners = {
Branch.Events.AFTER_CLOSE : after_close,
Branch.Events.AFTER_ADD : after_node_add,
Branch.Events.AFTER_TICK : after_tick}
def add_branch(branch: Branch):
if branch in self:
raise Emsg.DuplicateValue(branch.id)
if not branch.closed:
# Append to linqset will raise duplicate value error.
opens.append(branch)
branches.append(branch)
stat[branch] = self.BranchStat({
Tableau.StatKey.STEP_ADDED : self.current_step,
Tableau.StatKey.INDEX : len(branches) - 1,
Tableau.StatKey.PARENT : branch.parent})
# For corner case of an AFTER_BRANCH_ADD callback adding a node, make
# sure we don't emit AFTER_NODE_ADD twice, so prefetch the nodes.
if branch.parent is None:
nodes = deque(branch, maxlen=len(branch))
else:
nodes = EMPTY_SET
# This means we need to start listening before we emit. There
# could be the possibility of recursion.
branch.on(branch_listeners)
self.emit(Tableau.Events.AFTER_BRANCH_ADD, branch)
if len(nodes):
for node in nodes:
after_node_add(node, branch)
self.on({add_event: add_branch})
def after_rule_apply(target: Target):
try:
history.append(target._entry)
except AttributeError:
history.append(Tableau.StepEntry(target.rule, target, Counter()))
self.flag |= self.flag.TIMING_INACCURATE
self.flag |= self.flag.STARTED
tab_listeners = {
Tableau.Events.AFTER_RULE_APPLY: after_rule_apply}
self.on(tab_listeners)
def _get_group_application(self, branch, group: Sequence[Rule], /) -> Tableau.StepEntry:
"""Find and return the next available rule application for the given open
branch and rule group.
This calls the ``rule.target(branch)`` on the rules.
If the `is_group_optim` option is `disabled`, then the first non-empty
target returned by a rule is selected. The target is updated with the
the following:
- `group_score` : None
- `total_group_targets` : 1
- `min_group_score` : None
- `is_group_optim` : False
If the `is_group_optim` option is `enabled`, then all non-empty targets
collected and passed to ``__select_optim_group_application()`` to
compute the scores and select the winner.
Returns:
A (rule, target) pair, or ``None``.
"""
is_group_optim = self.opts['is_group_optim']
results = deque(maxlen = len(group) * bool(is_group_optim))
for rule in group:
target = rule.target(branch)
if target is not None:
entry = Tableau.StepEntry(rule, target, Counter())
target._entry = entry
if not is_group_optim:
target.update(
group_score = None,
total_group_targets = 1,
min_group_score = None,
is_group_optim = False)
return entry
results.append(entry)
if results:
return self._select_optim_group_application(results)
def _select_optim_group_application(self, entries: Sequence[Tableau.StepEntry], /) -> Tableau.StepEntry:
"""Choose the highest scoring element from given entries.
This calls ``rule.group_score(target)`` on each entry to compute the
score. In case of a tie, the earliest element wins.
Before the selected result is returned, the ``target`` dict is updated
with the following:
- `group_score` : int
- `total_group_targets`: int
- `min_group_score` : int
- `is_group_optim` : True
Args:
entries: A sequence of StepEntry objects.
Returns:
A highest scoring entry.
"""
group_scores = deque(entry.rule.group_score(entry.target)
for entry in entries)
max_group_score = max(group_scores)
min_group_score = min(group_scores)
for group_score, entry in zip(group_scores, entries):
if group_score == max_group_score:
entry.target.update(
group_score = max_group_score,
total_group_targets = len(entries),
min_group_score = min_group_score,
is_group_optim = True)
return entry
def _compute_stats(self):
'Compute the stats property after the tableau is finished.'
try:
distinct_nodes = self.tree.distinct_nodes
except AttributeError:
distinct_nodes = None
timers = self.timers
return dict(
result = self._result_word(),
branches = len(self),
open_branches = len(self.open),
closed_branches = len(self) - len(self.open),
steps = len(self.history),
distinct_nodes = distinct_nodes,
rules_duration_ms = sum(
step.duration.value
for step in self.history),
build_duration_ms = timers.build.elapsed_ms(),
trunk_duration_ms = timers.trunk.elapsed_ms(),
tree_duration_ms = timers.tree.elapsed_ms(),
models_duration_ms = timers.models.elapsed_ms(),
rules_time_ms = sum(
rule.timers[name].elapsed_ms()
for rule in self.rules
for name in ('search', 'apply')))
def _check_timeout(self):
if self.flag.HAS_TIME_LIMIT not in self.flag:
return
if self.timers.build.elapsed_ms() > self.opts['build_timeout']:
self.flag |= self.flag.TIMED_OUT
self.finish()
raise Emsg.Timeout(self.opts['build_timeout'])
def _is_max_steps_exceeded(self) -> bool:
return (
self.flag.HAS_STEP_LIMIT in self.flag and
len(self.history) >= self.opts['max_steps'])
def _result_word(self) -> str:
if self.valid:
return 'Valid'
if self.invalid:
return 'Invalid'
if self.completed:
return 'Completed'
return 'Unfinished'
def _gen_models(self):
'Build models for the open branches.'
Model = self.logic.Model
for branch in self.open:
self._check_timeout()
model = Model()
model.read_branch(branch)
branch.model = model
yield model
class NodeStat(dict):
__slots__ = EMPTY_SET
Flag = TableauMeta.Flag
Key = TableauMeta.StatKey
defaults = MapProxy({
Key.FLAGS : Flag(0),
Key.STEP_ADDED : Flag(0),
Key.STEP_TICKED : None})
def __init__(self):
super().__init__(self.defaults)
class BranchStat(dict):
__slots__ = EMPTY_SET
Flag = TableauMeta.Flag
Key = TableauMeta.StatKey
defaults = MapProxy({
Key.FLAGS : Flag(0),
Key.STEP_ADDED : Flag(0),
Key.STEP_CLOSED : Flag(0),
Key.INDEX : None,
Key.PARENT : None})
def __init__(self, mapping = None, /):
super().__init__(self.defaults)
self[self.Key.NODES] = {}
if mapping is not None:
self.update(mapping)
def node(self, node, /):
'Get the stat info for the node, and create if missing.'
# Avoid using defaultdict, since it may hide problems.
try:
return self[self.Key.NODES][node]
except KeyError:
return self[self.Key.NODES].setdefault(node, Tableau.NodeStat())
def view(self):
return {k: self[k] for k in self.defaults}
class Stat(dict[Branch, BranchStat]):
__slots__ = EMPTY_SET
Key = TableauMeta.StatKey
def query(self, branch: Branch, *keys):
# Lookup options:
# - branch
# - branch, key
# - branch, node
# - branch, node, key
Key = self.Key
stat = self[branch]
kit = iter(keys)
try:
key = next(kit)
except StopIteration:
# branch
return stat.view()
if isinstance(key, Node):
# branch, node
stat = stat[Key.NODES][key]
try:
key = Key(next(kit))
except StopIteration:
return MapProxy(stat)
else:
# branch, node, key
stat = stat[key]
try:
next(kit)
except StopIteration:
# literal value
return stat
raise ValueError('Too many keys to lookup')
key = Key(key)
try:
# branch, key
stat = stat[key]
next(kit)
except StopIteration:
if key is Key.NODES:
raise NotImplementedError('Full nodes view not supported')
# literal value
return stat
raise ValueError('Too many keys to lookup')
[docs]
class Tree:
'Recursive tree structure representation of a tableau.'
root: bool = False
nodes: list[Node]
"The nodes on this structure."
ticksteps: list[int|None]
"The ticked steps list."
children: list[Tableau.Tree]
"The child structures."
leaf: bool = False
"Whether this is a terminal (childless) structure."
closed: bool = False
"Whether this is a terminal structure that is closed."
open: bool = False
"Whether this is a terminal structure that is open."
left: int = None
"The pre-ordered tree left value."
right: int = None
"The pre-ordered tree right value."
descendant_node_count: int = 0
"The total node count of all descendants."
structure_node_count: int = 0
"The node count plus descendant node count."
depth: int = 0
"The depth of this structure (ancestor structure count)."
has_open: bool = False
"Whether this structure or a descendant is open."
has_closed: bool = False
"Whether this structure or a descendant is closed."
closed_step: Optional[int] = None
"If closed, the step number at which it closed."
step: int = None
"The step number at which this structure first appears."
width: int = 0
"The number of descendant terminal structures, or 1."
balanced_line_width: float = 0.0
"""0.5x the width of the first child structure, plus 0.5x the
width of the last child structure (if distinct from the first),
plus the sum of the widths of the other (distinct) children.
"""
balanced_line_margin: float = 0.0
"""0.5x the width of the first child structure divided by the
width of this structure.
"""
branch_id: Optional[int] = None
"The branch id, only set for leaves"
model_id: Optional[int] = None
"The model id, if exists, only set for leaves"
is_only_branch: bool = False
"Whether this is the one and only branch"
branch_step: int = None
"The step at which the branch was added"
def __init__(self):
self.nodes = []
self.ticksteps = []
self.children = []
self.id = id(self)
@classmethod
def make(cls, tab: Tableau):
return cls._build(tab, tab)
@classmethod
def _build(cls, tab: Tableau, branches: Sequence[Branch], depth=0, memo=None,/) -> Tableau.Tree:
StatKey = Tableau.StatKey
tree = cls()
if memo is None:
memo = dict(pos=1, depth=0, distinct_nodes=0, root=tree)
tree.root = True
else:
memo['pos'] += 1
tree.depth = memo['depth']
tree.left = memo['pos']
while True:
# Each branch's node at depth.
nodes = qset()
specimen = None
for branch in branches:
if len(branch) <= depth:
# Consider only branches with a node at depth.
continue
if specimen is None:
specimen = branch
nodes.add(branch[depth])
if tab.flag.CLOSED in tab.stat(branch, StatKey.FLAGS):
tree.has_closed = True
else:
tree.has_open = True
if len(nodes) != 1:
# There is *not* a singular node shared by all branches at depth.
break
# There is one node shared by all branches at depth, thus the
# branches are equivalent up to this depth.
branch = specimen
node, = nodes
tree.nodes.append(node)
tree.ticksteps.append(tab.stat(branch, node, StatKey.STEP_TICKED))
step_added = tab.stat(branch, node, StatKey.STEP_ADDED)
if tree.step is None or step_added < tree.step:
tree.step = step_added
depth += 1
memo['distinct_nodes'] += len(tree.nodes)
if len(branches) == 1:
# Finalize leaf attributes.
cls._build_leaf(tab, tree, branches[0], memo)
else:
# Build child structures for each distinct node at node_depth.
memo['depth'] += 1
cls._build_branches(tab, tree, branches, nodes, depth, memo)
memo['depth'] -= 1
tree.structure_node_count = tree.descendant_node_count + len(tree.nodes)
memo['pos'] += 1
tree.right = memo['pos']
# tree.id = hash((tab.id, tree.left, tree.right))
if memo['root'] is tree:
tree.distinct_nodes = memo['distinct_nodes']
return tree
@classmethod
def _build_leaf(cls, tab: Tableau, tree: Tableau.Tree, branch: Branch, memo: dict, /):
'Finalize attributes for leaf structure.'
StatKey = Tableau.StatKey
tree.closed = tab.flag.CLOSED in tab.stat(branch, StatKey.FLAGS)
tree.open = not tree.closed
if tree.closed:
tree.closed_step = tab.stat(branch, StatKey.STEP_CLOSED)
tree.has_closed = True
else:
tree.has_open = True
tree.width = 1
tree.leaf = True
tree.branch_id = branch.id
if branch.model is not None:
tree.model_id = branch.model.id
if memo['depth'] == 0:
tree.is_only_branch = True
@classmethod
def _build_branches(cls, tab: Tableau, tree: Tableau.Tree, branches: Sequence[Branch], nodes: Set[Node], depth: int, memo: dict, /):
'Build child structures for each distinct node.'
# Widths of first, middle, last
widths = [0] * 3
for i, node in enumerate(nodes):
# recurse
next_branches = deque(b for b in branches if b[depth] == node)
child = cls._build(tab, next_branches, depth, memo)
tree.descendant_node_count = len(child.nodes) + child.descendant_node_count
tree.width += child.width
tree.children.append(child)
if i == 0:
# first node
tree.branch_step = child.step
widths[0] = child.width / 2
elif i == len(nodes) - 1:
# last node
widths[2] = child.width / 2
else:
widths[1] += child.width
tree.branch_step = min(tree.branch_step, child.step)
if tree.width > 0:
tree.balanced_line_width = sum(widths) / tree.width
tree.balanced_line_margin = widths[0] / tree.width
else:
tree.balanced_line_width = tree.balanced_line_margin = 0