123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- """
- Digress testing core.
- """
- from digress.errors import SkippedTestError, DisabledTestError, NoSuchTestError, \
- FailedTestError, AlreadyRunError, SCMError, \
- ComparisonError
- from digress.constants import *
- from digress.cli import dispatchable
- import inspect
- import operator
- import os
- import json
- import textwrap
- from shutil import rmtree
- from time import time
- from functools import wraps
- from itertools import izip_longest
- from hashlib import sha1
- class depends(object):
- """
- Dependency decorator for a test.
- """
- def __init__(self, *test_names):
- self.test_names = test_names
- def __call__(self, func):
- func.digress_depends = self.test_names
- return func
- class _skipped(object):
- """
- Internal skipped decorator.
- """
- def __init__(self, reason=""):
- self._reason = reason
- def __call__(self, func):
- @wraps(func)
- def _closure(*args):
- raise SkippedTestError(self._reason)
- return _closure
- class disabled(object):
- """
- Disable a test, with reason.
- """
- def __init__(self, reason=""):
- self._reason = reason
- def __call__(self, func):
- @wraps(func)
- def _closure(*args):
- raise DisabledTestError(self._reason)
- return _closure
- class comparer(object):
- """
- Set the comparer for a test.
- """
- def __init__(self, comparer_):
- self._comparer = comparer_
- def __call__(self, func):
- func.digress_comparer = self._comparer
- return func
- class Fixture(object):
- cases = []
- scm = None
- flush_before = False
- def _skip_case(self, case, depend):
- for name, meth in inspect.getmembers(case):
- if name[:5] == "test_":
- setattr(
- case,
- name,
- _skipped("failed dependency: case %s" % depend)(meth)
- )
- def _run_case(self, case, results):
- if case.__name__ in results:
- raise AlreadyRunError
- for depend in case.depends:
- if depend.__name__ in results and results[depend.__name__]["status"] != CASE_PASS:
- self._skip_case(case, depend.__name__)
- try:
- result = self._run_case(depend, results)
- except AlreadyRunError:
- continue
- if result["status"] != CASE_PASS:
- self._skip_case(case, depend.__name__)
- result = case().run()
- results[case.__name__] = result
- return result
- @dispatchable
- def flush(self, revision=None):
- """
- Flush any cached results. Takes a revision for an optional argument.
- """
- if not revision:
- print "Flushing all cached results...",
- try:
- rmtree(".digress_%s" % self.__class__.__name__)
- except Exception, e:
- print "failed: %s" % e
- else:
- print "done."
- else:
- try:
- rev = self.scm.rev_parse(revision)
- except SCMError, e:
- print e
- else:
- print "Flushing cached results for %s..." % rev,
- try:
- rmtree(os.path.join(".digress_%s" % self.__class__.__name__, rev))
- except Exception, e:
- print "failed: %s" % e
- else:
- print "done."
- @dispatchable
- def run(self, revision=None):
- """
- Run the fixture for a specified revision.
- Takes a revision for an argument.
- """
- oldrev = None
- oldbranch = None
- dirty = False
- try:
- dirty = self.scm.dirty()
- # if the tree is clean, then we don't need to make an exception
- if not dirty and revision is None: revision = "HEAD"
- if revision:
- oldrev = self.scm.current_rev()
- oldbranch = self.scm.current_branch()
- if dirty:
- self.scm.stash()
- self.scm.checkout(revision)
- rev = self.scm.current_rev()
- self.datastore = os.path.join(".digress_%s" % self.__class__.__name__, rev)
- if os.path.isdir(self.datastore):
- if self.flush_before:
- self.flush(rev)
- else:
- os.makedirs(self.datastore)
- else:
- rev = "(dirty working tree)"
- self.datastore = None
- print "Running fixture %s on revision %s...\n" % (self.__class__.__name__, rev)
- results = {}
- for case in self.cases:
- try:
- self._run_case(case, results)
- except AlreadyRunError:
- continue
- total_time = reduce(operator.add, filter(
- None,
- [
- result["time"] for result in results.values()
- ]
- ), 0)
- overall_status = (
- CASE_FAIL in [ result["status"] for result in results.values() ]
- ) and FIXTURE_FAIL or FIXTURE_PASS
- print "Fixture %s in %.4f.\n" % (
- (overall_status == FIXTURE_PASS) and "passed" or "failed",
- total_time
- )
- return { "cases" : results, "time" : total_time, "status" : overall_status, "revision" : rev }
- finally:
- if oldrev:
- self.scm.checkout(oldrev)
- if oldbranch:
- self.scm.checkout(oldbranch)
- if dirty:
- self.scm.unstash()
- @dispatchable
- def bisect(self, good_rev, bad_rev=None):
- """
- Perform a bisection between two revisions.
- First argument is the good revision, second is the bad revision, which
- defaults to the current revision.
- """
- if not bad_rev: bad_rev = self.scm.current_rev()
- dirty = False
- # get a set of results for the good revision
- good_result = self.run(good_rev)
- good_rev = good_result["revision"]
- try:
- dirty = self.scm.dirty()
- if dirty:
- self.scm.stash()
- self.scm.bisect("start")
- self.scm.bisect("bad", bad_rev)
- self.scm.bisect("good", good_rev)
- bisecting = True
- isbad = False
- while bisecting:
- results = self.run(self.scm.current_rev())
- revision = results["revision"]
- # perform comparisons
- # FIXME: this just uses a lot of self.compare
- for case_name, case_result in good_result["cases"].iteritems():
- case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
- for test_name, test_result in case_result["tests"].iteritems():
- test = filter(
- lambda pair: pair[0] == "test_%s" % test_name,
- inspect.getmembers(case)
- )[0][1]
- other_result = results["cases"][case_name]["tests"][test_name]
- if other_result["status"] == TEST_FAIL and case_result["status"] != TEST_FAIL:
- print "Revision %s failed %s.%s." % (revision, case_name, test_name)
- isbad = True
- break
- elif hasattr(test, "digress_comparer"):
- try:
- test.digress_comparer(test_result["value"], other_result["value"])
- except ComparisonError, e:
- print "%s differs: %s" % (test_name, e)
- isbad = True
- break
- if isbad:
- output = self.scm.bisect("bad", revision)
- print "Marking revision %s as bad." % revision
- else:
- output = self.scm.bisect("good", revision)
- print "Marking revision %s as good." % revision
- if output.split("\n")[0].endswith("is the first bad commit"):
- print "\nBisection complete.\n"
- print output
- bisecting = False
- print ""
- except SCMError, e:
- print e
- finally:
- self.scm.bisect("reset")
- if dirty:
- self.scm.unstash()
- @dispatchable
- def multicompare(self, rev_a=None, rev_b=None, mode="waterfall"):
- """
- Generate a comparison of tests.
- Takes three optional arguments, from which revision, to which revision,
- and the method of display (defaults to vertical "waterfall", also
- accepts "river" for horizontal display)
- """
- if not rev_a: rev_a = self.scm.current_rev()
- if not rev_b: rev_b = self.scm.current_rev()
- revisions = self.scm.revisions(rev_a, rev_b)
- results = []
- for revision in revisions:
- results.append(self.run(revision))
- test_names = reduce(operator.add, [
- [
- (case_name, test_name)
- for
- test_name, test_result
- in
- case_result["tests"].iteritems()
- ]
- for
- case_name, case_result
- in
- results[0]["cases"].iteritems()
- ], [])
- MAXLEN = 20
- colfmt = "| %s "
- table = []
- if mode not in ("waterfall", "river"):
- mode = "waterfall"
- print "Unknown multicompare mode specified, defaulting to %s." % mode
- if mode == "waterfall":
- header = [ "Test" ]
- for result in results:
- header.append(result["revision"])
- table.append(header)
- for test_name in test_names:
- row_data = [ ".".join(test_name) ]
- for result in results:
- test_result = result["cases"][test_name[0]]["tests"][test_name[1]]
- if test_result["status"] != TEST_PASS:
- value = "did not pass: %s" % (test_result["value"])
- else:
- value = "%s (%.4f)" % (test_result["value"], test_result["time"])
- row_data.append(value)
- table.append(row_data)
- elif mode == "river":
- header = [ "Revision" ]
- for test_name in test_names:
- header.append(".".join(test_name))
- table.append(header)
- for result in results:
- row_data = [ result["revision"] ]
- for case_name, case_result in result["cases"].iteritems():
- for test_name, test_result in case_result["tests"].iteritems():
- if test_result["status"] != TEST_PASS:
- value = "did not pass: %s" % (test_result["value"])
- else:
- value = "%s (%.4f)" % (test_result["value"], test_result["time"])
- row_data.append(value)
- table.append(row_data)
- breaker = "=" * (len(colfmt % "".center(MAXLEN)) * len(table[0]) + 1)
- print breaker
- for row in table:
- for row_stuff in izip_longest(*[
- textwrap.wrap(col, MAXLEN, break_on_hyphens=False) for col in row
- ], fillvalue=""):
- row_output = ""
- for col in row_stuff:
- row_output += colfmt % col.ljust(MAXLEN)
- row_output += "|"
- print row_output
- print breaker
- @dispatchable
- def compare(self, rev_a, rev_b=None):
- """
- Compare two revisions directly.
- Takes two arguments, second is optional and implies current revision.
- """
- results_a = self.run(rev_a)
- results_b = self.run(rev_b)
- for case_name, case_result in results_a["cases"].iteritems():
- case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
- header = "Comparison of case %s" % case_name
- print header
- print "=" * len(header)
- for test_name, test_result in case_result["tests"].iteritems():
- test = filter(
- lambda pair: pair[0] == "test_%s" % test_name,
- inspect.getmembers(case)
- )[0][1]
- other_result = results_b["cases"][case_name]["tests"][test_name]
- if test_result["status"] != TEST_PASS or other_result["status"] != TEST_PASS:
- print "%s cannot be compared as one of the revisions have not passed it." % test_name
- elif hasattr(test, "digress_comparer"):
- try:
- test.digress_comparer(test_result["value"], other_result["value"])
- except ComparisonError, e:
- print "%s differs: %s" % (test_name, e)
- else:
- print "%s does not differ." % test_name
- else:
- print "%s has no comparer and therefore cannot be compared." % test_name
- print ""
- @dispatchable
- def list(self):
- """
- List all available test cases, excluding dependencies.
- """
- print "\nAvailable Test Cases"
- print "===================="
- for case in self.cases:
- print case.__name__
- def register_case(self, case):
- case.fixture = self
- self.cases.append(case)
- class Case(object):
- depends = []
- fixture = None
- def _get_test_by_name(self, test_name):
- if not hasattr(self, "test_%s" % test_name):
- raise NoSuchTestError(test_name)
- return getattr(self, "test_%s" % test_name)
- def _run_test(self, test, results):
- test_name = test.__name__[5:]
- if test_name in results:
- raise AlreadyRunError
- if hasattr(test, "digress_depends"):
- for depend in test.digress_depends:
- if depend in results and results[depend]["status"] != TEST_PASS:
- test = _skipped("failed dependency: %s" % depend)(test)
- dependtest = self._get_test_by_name(depend)
- try:
- result = self._run_test(dependtest, results)
- except AlreadyRunError:
- continue
- if result["status"] != TEST_PASS:
- test = _skipped("failed dependency: %s" % depend)(test)
- start_time = time()
- run_time = None
- print "Running test %s..." % test_name,
- try:
- if not self.datastore:
- # XXX: this smells funny
- raise IOError
- with open(os.path.join(
- self.datastore,
- "%s.json" % sha1(test_name).hexdigest()
- ), "r") as f:
- result = json.load(f)
- value = str(result["value"])
- if result["status"] == TEST_DISABLED:
- status = "disabled"
- elif result["status"] == TEST_SKIPPED:
- status = "skipped"
- elif result["status"] == TEST_FAIL:
- status = "failed"
- elif result["status"] == TEST_PASS:
- status = "passed"
- value = "%s (in %.4f)" % (
- result["value"] or "(no result)",
- result["time"]
- )
- else:
- status = "???"
- print "%s (cached): %s" % (status, value)
- except IOError:
- try:
- value = test()
- except DisabledTestError, e:
- print "disabled: %s" % e
- status = TEST_DISABLED
- value = str(e)
- except SkippedTestError, e:
- print "skipped: %s" % e
- status = TEST_SKIPPED
- value = str(e)
- except FailedTestError, e:
- print "failed: %s" % e
- status = TEST_FAIL
- value = str(e)
- except Exception, e:
- print "failed with exception: %s" % e
- status = TEST_FAIL
- value = str(e)
- else:
- run_time = time() - start_time
- print "passed: %s (in %.4f)" % (
- value or "(no result)",
- run_time
- )
- status = TEST_PASS
- result = { "status" : status, "value" : value, "time" : run_time }
- if self.datastore:
- with open(os.path.join(
- self.datastore,
- "%s.json" % sha1(test_name).hexdigest()
- ), "w") as f:
- json.dump(result, f)
- results[test_name] = result
- return result
- def run(self):
- print "Running case %s..." % self.__class__.__name__
- if self.fixture.datastore:
- self.datastore = os.path.join(
- self.fixture.datastore,
- sha1(self.__class__.__name__).hexdigest()
- )
- if not os.path.isdir(self.datastore):
- os.makedirs(self.datastore)
- else:
- self.datastore = None
- results = {}
- for name, meth in inspect.getmembers(self):
- if name[:5] == "test_":
- try:
- self._run_test(meth, results)
- except AlreadyRunError:
- continue
- total_time = reduce(operator.add, filter(
- None, [
- result["time"] for result in results.values()
- ]
- ), 0)
- overall_status = (
- TEST_FAIL in [ result["status"] for result in results.values() ]
- ) and CASE_FAIL or CASE_PASS
- print "Case %s in %.4f.\n" % (
- (overall_status == FIXTURE_PASS) and "passed" or "failed",
- total_time
- )
- return { "tests" : results, "time" : total_time, "status" : overall_status }
|