#!/usr/bin/env python
# encoding: utf-8
"""
This module defines the main ``GramFuzzer`` class through
which rules are defined and rules are randomly generated.
"""
from collections import deque
import copy
import gc
import os
import six
import sys
import gramfuzz.errors as errors
import gramfuzz.rand as rand
import gramfuzz.utils as utils
__version__ = "v1.4.0"
[docs]class GramFuzzer(object):
"""
``GramFuzzer`` is a singleton class that is used to
hold rule definitions and to generate grammar rules from a specific
category at random.
"""
defs = {}
"""Rule definitions by category. E.g. ::
{
"category": {
"rule1": [<Rule1Def1>, <Rule1Def2>],
"rule2": [<Rule2Def1>, <Rule2Def2>],
...
}
}
"""
no_prunes = {}
"""Rules that have specifically asked not to be pruned,
even if the rule can't be reached.
"""
cat_groups = {}
"""Used to store where rules were defined in. E.g. if a rule
``A`` was defined using the category ``alphabet_rules`` in a file
called ``test_rules.py``, it would show up
in ``cat_groups`` as: ::
{
"alphabet_rules": {
"test_rules": ["A"]
}
}
This lets the user specify probabilities/priorities for rules coming
from certain grammar files
"""
__instance__ = None
[docs] @classmethod
def instance(cls):
"""Return the singleton instance of the ``GramFuzzer``
"""
if cls.__instance__ is None:
cls()
return cls.__instance__
[docs] def __init__(self, debug=False):
"""Create a new ``GramFuzzer`` instance
"""
GramFuzzer.__instance__ = self
self.debug = debug
self.defs = {}
self.no_prunes = {}
self.cat_groups = {}
self.cat_group_defaults = {}
# used during rule generation to keep track of things
# that can be reverted if things go wrong
self._staged_defs = None
# the last-defined preferred category-group names
self._last_prefs = None
# the last-defined rule definition names derived from the ``_last_prefs``
self._last_pref_keys = None
# a simple flag to tell if data needs to be auto processed or not
self._rules_processed = False
[docs] def load_grammar(self, path):
"""Load a grammar file (python file containing grammar definitions) by
file path. When loaded, the global variable ``GRAMFUZZER`` will be set
within the module. This is not always needed, but can be useful.
:param str path: The path to the grammar file
"""
if not os.path.exists(path):
raise Exception("path does not exist: {!r}".format(path))
# this will let grammars reference eachother with relative
# imports.
#
# E.g.:
# grams/
# gram1.py
# gram2.py
#
# gram1.py can do "import gram2" to require rules in gram2.py to
# be loaded
grammar_path = os.path.dirname(path)
if grammar_path not in sys.path:
sys.path.append(grammar_path)
with open(path, "r") as f:
data = f.read()
code = compile(data, path, "exec")
locals_ = {"GRAMFUZZER": self, "__file__": path}
exec(code, locals_, locals_)
if "TOP_CAT" in locals_:
cat_group = os.path.basename(path).replace(".py", "")
self.set_cat_group_top_level_cat(cat_group, locals_["TOP_CAT"])
[docs] def set_max_recursion(self, level):
"""Set the maximum reference-recursion depth (not the Python system maximum stack
recursion level). This controls how many levels deep of nested references are allowed
before gramfuzz attempts to generate the shortest (reference-wise) rules possible.
:param int level: The new maximum reference level
"""
import gramfuzz.fields
gramfuzz.fields.Ref.max_recursion = level
[docs] def preprocess_rules(self):
"""Calculate shortest reference-paths of each rule (and Or field),
and prune all unreachable rules.
"""
to_prune = self._find_shortest_paths()
self._prune_rules(to_prune)
self._rules_processed = True
def _find_shortest_paths(self):
leaf_rules = deque()
non_leaf_rules = deque()
rule_ref_lengths = {}
# first find all rule definitions that *don't* have
# any references - these are the leaf nodes
for cat in self.defs.keys():
for rule_name, rules in six.iteritems(self.defs.get(cat, {})):
for rule in rules:
refs = self._collect_refs(rule)
if len(refs) == 0:
leaf_rules.append(rule)
if not hasattr(rule, "name"):
rule_ref_lengths[cat + "-:-" + str(rule)] = (0, [rule])
else:
rule_ref_lengths[cat + "-:-" + rule.name] = (0, [rule])
else:
if not hasattr(rule, "unprocessed_refs"):
rule.unresolved_refs = deque()
rule.unresolved_refs.extend(refs)
non_leaf_rules.append((cat, rule))
# for every referenced rule, determine how many steps away
# from a leaf node it is
post_process = deque()
unprocessed_count = 0
while len(non_leaf_rules) > 0:
# this means we've looped twice over everything
# without being able to process the remaining nodes
if unprocessed_count // len(non_leaf_rules) == 2:
break
cat, curr_rule = non_leaf_rules.popleft()
ref_length = self._process_shortest_ref(cat, curr_rule, rule_ref_lengths)
if ref_length is None:
non_leaf_rules.append((cat, curr_rule))
unprocessed_count += 1
continue
unprocessed_count = 0
ref_key = cat + "-:-" + curr_rule.name
if ref_key not in rule_ref_lengths or ref_length < rule_ref_lengths[ref_key][0]:
rule_ref_lengths[ref_key] = (ref_length, [curr_rule])
elif ref_length == rule_ref_lengths[ref_key][0]:
rule_ref_lengths[ref_key][1].append(curr_rule)
post_process.append((cat, curr_rule))
self._assign_or_shortest_vals(post_process, rule_ref_lengths)
if self.debug:
for cat, rule in non_leaf_rules:
if hasattr(rule, "unresolved_refs"):
for ref in rule.unresolved_refs:
if rule_ref_lengths.get(cat + "-:-" + ref.refname, None) is not None:
continue
print("Pruning rule {!r} due to unresolvable reference: {!r}".format(
rule.name,
ref.refname,
))
else:
print("Pruning rule {!r}".format(rule.name))
# these should be pruned
return non_leaf_rules
def _prune_rules(self, non_leaf_rules):
for cat,rule in non_leaf_rules:
if cat in self.no_prunes and rule.name in self.no_prunes[cat]:
if self.debug:
print("Should prune {!r}, but no_prune = True".format(
rule.name,
))
continue
rule_list = self.defs.get(cat, {}).get(rule.name, [])
rule_list.remove(rule)
if len(rule_list) == 0:
del self.defs.get(cat, {})[rule.name]
def _assign_or_shortest_vals(self, fields, rule_ref_lengths):
for cat,field in fields:
self._process_shortest_ref(cat, field, rule_ref_lengths, assign_or=True)
def _process_shortest_ref(self, cat, field, rule_ref_lengths, assign_or=False):
import gramfuzz.fields as fields
# strings, ints, hard-coded non-gramfuzz values
if not isinstance(field, fields.Field) or getattr(field, "shortest_is_nothing", False):
return 0
# return None if it can't be determined yet
if isinstance(field, fields.Or):
min_ref = 0xffffff
min_vals = []
for idx, val in enumerate(field.values):
val_ref = self._process_shortest_ref(
cat,
val,
rule_ref_lengths,
assign_or = assign_or,
)
if val_ref is None:
continue
elif val_ref < min_ref:
min_ref = val_ref
min_vals = [(val, idx)]
elif val_ref == min_ref:
min_vals.append((val, idx))
if min_ref == 0xffffff:
return None
if assign_or:
# WeightedOr needs to know the indices of the values that are
# shortest to be able to use the correct weights
field.shortest_vals = [x[0] for x in min_vals]
field.shortest_indices = [x[1] for x in min_vals]
return min_ref
# these all be all refs from Ands, Joins, etc, while ignoring
# any optional fields (since those will be ignored with shortest=True
# set on a call to build()).
#
# Also for Defs, Ands, Joins, etc, we'll be returning the maximum
# ref length, since every reference *must* be generated
if hasattr(field, "values"):
max_ref_length = -1
for val in field.values:
ref_val_length = self._process_shortest_ref(
cat,
val,
rule_ref_lengths,
assign_or = assign_or,
)
if ref_val_length is None:
return None
if ref_val_length > max_ref_length:
max_ref_length = ref_val_length
if max_ref_length == -1:
return None
return max_ref_length
if isinstance(field, fields.Ref):
ref_key = field.cat + "-:-" + field.refname
ref_val = rule_ref_lengths.get(ref_key, (-1, []))[0]
if ref_val == -1:
return None
ref_rule_val = rule_ref_lengths[ref_key][1][0]
# if the referenced value is a native python type,
# don't increment the reference value
#
# E.g. If it's a Ref("string"), and Def("string") doesn't contain
# any references, don't increment the value
if ref_val == 0 and len(self._collect_refs(ref_rule_val)) == 0:
return 0
# add one if the reference value is more than 0
return ref_val + 1
return None
def _collect_refs(self, item_val, acc=None, no_opt=False):
if acc is None:
acc = deque()
from gramfuzz.fields import Opt
if no_opt and (isinstance(item_val, Opt) or item_val.shortest_is_nothing):
return acc
from gramfuzz.fields import Ref
if isinstance(item_val, Ref):
acc.append(item_val)
if hasattr(item_val, "values"):
for val in item_val.values:
self._collect_refs(val, acc)
return acc
# --------------------------------------
# public, but intended for internal use
# --------------------------------------
[docs] def add_definition(self, cat, def_name, def_val, no_prune=False, gram_file="default"):
"""Add a new rule definition named ``def_name`` having value ``def_value`` to
the category ``cat``.
:param str cat: The category to add the rule to
:param str def_name: The name of the rule definition
:param def_val: The value of the rule definition
:param bool no_prune: If the rule should explicitly *NOT*
be pruned even if it has been determined to be unreachable (default=``False``)
:param str gram_file: The file the rule was defined in (default=``"default"``).
"""
self._rules_processed = False
self.add_to_cat_group(cat, gram_file, def_name)
if no_prune:
self.no_prunes.setdefault(cat, {}).setdefault(def_name, True)
if self._staged_defs is not None:
# if we're tracking changes during rule generation, add any new rules
# to _staged_defs so they can be reverted if something goes wrong
self._staged_defs.append((cat, def_name, def_val))
else:
self.defs.setdefault(cat, {}).setdefault(def_name, deque()).append(def_val)
[docs] def set_cat_group_top_level_cat(self, cat_group, top_level_cat):
"""Set the default category when generating data from the grammars defined
in cat group. *Note* a cat group is usually just the basename of the grammar
file, minus the ``.py``.
:param str cat_group: The category group to set the default top-level cat for
:param str top_level_cat: The top-level (default) category of the cat group
"""
self.cat_group_defaults[cat_group] = top_level_cat
[docs] def add_to_cat_group(self, cat, cat_group, def_name):
"""Associate the provided rule definition name ``def_name`` with the
category group ``cat_group`` in the category ``cat``.
:param str cat: The category the rule definition was declared in
:param str cat_group: The group within the category the rule belongs to
:param str def_name: The name of the rule definition
"""
self.cat_groups.setdefault(cat, {}).setdefault(cat_group, deque()).append(def_name)
[docs] def get_ref(self, cat, refname):
"""Return one of the rules in the category ``cat`` with the name
``refname``. If multiple rule defintions exist for the defintion name
``refname``, use :any:`gramfuzz.rand` to choose a rule at random.
:param str cat: The category to look for the rule in.
:param str refname: The name of the rule definition. If the rule definition's name is
``"*"``, then a rule name will be chosen at random from within the category ``cat``.
:returns: gramfuzz.fields.Def
"""
if cat not in self.defs:
raise errors.GramFuzzError("referenced definition category ({!r}) not defined".format(cat))
if refname == "*":
refname = rand.choice(list(self.defs[cat].keys()))
if refname not in self.defs[cat]:
raise errors.GramFuzzError("referenced definition ({!r}) not defined".format(refname))
return rand.choice(self.defs[cat][refname])
[docs] def gen(self, num, cat=None, cat_group=None, preferred=None, preferred_ratio=0.5, max_recursion=None, auto_process=True):
"""Generate ``num`` rules from category ``cat``, optionally specifying
preferred category groups ``preferred`` that should be preferred at
probability ``preferred_ratio`` over other randomly-chosen rule definitions.
:param int num: The number of rules to generate
:param str cat: The name of the category to generate ``num`` rules from
:param str cat_group: The category group (ie python file) to generate rules from. This
was added specifically to make it easier to generate data based on the name
of the file the grammar was defined in, and is intended to work with the
``TOP_CAT`` values that may be defined in a loaded grammar file.
:param list preferred: A list of preferred category groups to generate rules from
:param float preferred_ratio: The percent probability that the preferred
groups will be chosen over randomly choosen rule definitions from category ``cat``.
:param int max_recursion: The maximum amount to allow references to recurse
:param bool auto_process: Whether rules should be automatically pruned and
shortest reference paths determined. See :any:`gramfuzz.GramFuzzer.preprocess_rules`
for what would automatically be done.
"""
import gramfuzz.fields
gramfuzz.fields.REF_LEVEL = 1
if cat is None and cat_group is None:
raise gramfuzz.errors.GramFuzzError("cat and cat_group are None, one must be set")
if cat is None and cat_group is not None:
if cat_group not in self.cat_group_defaults:
raise gramfuzz.errors.GramFuzzError(
"cat_group {!r} did not define a TOP_CAT variable"
)
cat = self.cat_group_defaults[cat_group]
if not isinstance(cat, six.string_types):
raise gramfuzz.errors.GramFuzzError(
"cat_group {!r}'s TOP_CAT variable was not a string"
)
if auto_process and self._rules_processed == False:
self.preprocess_rules()
if max_recursion is not None:
self.set_max_recursion(max_recursion)
if preferred is None:
preferred = []
res = deque()
cat_defs = self.defs[cat]
# optimizations
_res_append = res.append
_res_extend = res.extend
_choice = rand.choice
_maybe = rand.maybe
_val = utils.val
keys = list(self.defs[cat].keys())
self._last_pref_keys = self._get_pref_keys(cat, preferred)
# be sure to set this *after* fetching the pref keys (above^)
self._last_prefs = preferred
total_errors = deque()
total_gend = 0
while total_gend < num:
# use a rule definition from one of the preferred category
# groups
if len(self._last_pref_keys) > 0 and _maybe(preferred_ratio):
rand_key = _choice(self._last_pref_keys)
if rand_key not in cat_defs:
# TODO this means it was removed / pruned b/c it was unreachable??
# TODO look into this more
rand_key = _choice(keys)
# else just choose a key at random from the category
else:
rand_key = _choice(keys)
# pruning failed, this rule is not defined/reachable
if rand_key not in cat_defs:
continue
v = _choice(cat_defs[rand_key])
# not used directly by GramFuzzer, but could be useful
# to subclasses of GramFuzzer
info = {}
pre = deque()
self.pre_revert(info)
val_res = None
try:
val_res = _val(v, pre)
except errors.GramFuzzError as e:
raise
#total_errors.append(e)
#self.revert(info)
#continue
except RuntimeError as e:
print("RUNTIME ERROR")
self.revert(info)
continue
if val_res is not None:
_res_extend(pre)
_res_append(val_res)
total_gend += 1
self.post_revert(cat, res, total_gend, num, info)
return res
[docs] def pre_revert(self, info=None):
"""Signal to begin saving any changes that might need to be reverted
"""
self._staged_defs = deque()
[docs] def post_revert(self, cat, res, total_num, num, info):
"""Commit any staged rule definition changes (rule generation went
smoothly).
"""
if self._staged_defs is None:
return
for cat,def_name,def_value in self._staged_defs:
self.defs.setdefault(cat, {}).setdefault(def_name, deque()).append(def_value)
self._staged_defs = None
[docs] def revert(self, info=None):
"""Revert after a single def errored during generate (throw away all
staged rule definition changes)
"""
self._staged_defs = None
def _get_pref_keys(self, cat, preferred):
if preferred == self._last_prefs:
pref_keys = self._last_pref_keys
else:
pref_keys = deque()
for pref in preferred:
if pref in self.cat_groups[cat]:
pref_keys.extend(self.cat_groups[cat][pref])
elif pref in self.defs[cat]:
pref_keys.append(pref)
return pref_keys