Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions tests/test_testresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,148 @@ def test_empty_detail_status_correct(self):
log._events,
)

def test_subtest_failure(self):
# Test that addSubTest collects failures and reports them in stopTest

# Create a mock subtest that mimics unittest's _SubTest
class MockSubTest:
def __init__(self, parent, description):
self._parent = parent
self._description = description

def id(self):
return f"{self._parent.id()} {self._description}"

def _subDescription(self):
return self._description

log = LoggingStreamResult()
result = ExtendedToStreamDecorator(log)
result.startTestRun()
now = datetime.datetime.now(utc)
result.time(now)
result.startTest(self)

# Simulate a failing subtest
subtest = MockSubTest(self, "(i=1)")
try:
raise AssertionError("subtest failed")
except AssertionError:
err = sys.exc_info()
result.addSubTest(self, subtest, err)

result.stopTest(self)
result.stopTestRun()

# Filter events to check structure
test_id = self.id()
events = log._events

# Should have: startTestRun, inprogress, traceback attachment, fail, stopTestRun
self.assertEqual(events[0], ("startTestRun",))
self.assertEqual(events[1].test_id, test_id)
self.assertEqual(events[1].test_status, "inprogress")

# The traceback attachment for the subtest
self.assertEqual(events[2].test_id, test_id)
self.assertEqual(events[2].file_name, "traceback (i=1)")
self.assertIn(b"AssertionError: subtest failed", events[2].file_bytes)

# The final fail status
self.assertEqual(events[3].test_id, test_id)
self.assertEqual(events[3].test_status, "fail")

self.assertEqual(events[4], ("stopTestRun",))

def test_subtest_success_no_events(self):
# Test that successful subtests don't generate events
class MockSubTest:
def __init__(self, parent, description):
self._parent = parent
self._description = description

def id(self):
return f"{self._parent.id()} {self._description}"

def _subDescription(self):
return self._description

log = LoggingStreamResult()
result = ExtendedToStreamDecorator(log)
result.startTestRun()
now = datetime.datetime.now(utc)
result.time(now)
result.startTest(self)

# Simulate a passing subtest (err=None)
subtest = MockSubTest(self, "(i=0)")
result.addSubTest(self, subtest, None)

# Simulate the success callback that unittest sends when all subtests pass
result.addSuccess(self)
result.stopTest(self)
result.stopTestRun()

test_id = self.id()
events = log._events

# Should have: startTestRun, inprogress, success, stopTestRun
# No subtest-specific events since it passed
self.assertEqual(events[0], ("startTestRun",))
self.assertEqual(events[1].test_id, test_id)
self.assertEqual(events[1].test_status, "inprogress")
self.assertEqual(events[2].test_id, test_id)
self.assertEqual(events[2].test_status, "success")
self.assertEqual(events[3], ("stopTestRun",))

def test_multiple_subtest_failures(self):
# Test that multiple subtest failures are all reported
class MockSubTest:
def __init__(self, parent, description):
self._parent = parent
self._description = description

def id(self):
return f"{self._parent.id()} {self._description}"

def _subDescription(self):
return self._description

log = LoggingStreamResult()
result = ExtendedToStreamDecorator(log)
result.startTestRun()
now = datetime.datetime.now(utc)
result.time(now)
result.startTest(self)

# Simulate two failing subtests
for i in [1, 2]:
subtest = MockSubTest(self, f"(i={i})")
try:
raise AssertionError(f"subtest {i} failed")
except AssertionError:
err = sys.exc_info()
result.addSubTest(self, subtest, err)

result.stopTest(self)
result.stopTestRun()

events = log._events

# Should have: startTestRun, inprogress, 2x traceback, fail, stopTestRun
self.assertEqual(events[0], ("startTestRun",))
self.assertEqual(events[1].test_status, "inprogress")

# Two traceback attachments
self.assertEqual(events[2].file_name, "traceback (i=1)")
self.assertIn(b"subtest 1 failed", events[2].file_bytes)
self.assertEqual(events[3].file_name, "traceback (i=2)")
self.assertIn(b"subtest 2 failed", events[3].file_bytes)

# Final fail status
self.assertEqual(events[4].test_status, "fail")
self.assertEqual(events[5], ("stopTestRun",))


class TestResourcedToStreamDecorator(TestCase):
def setUp(self):
Expand Down
61 changes: 61 additions & 0 deletions testtools/testresult/real.py
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,7 @@ def __init__(self, decorated: StreamResult) -> None:
self._started = False
self._tags: TagContext | None = None
self.__now: datetime.datetime | None = None
self._subtest_failures: list[tuple[unittest.TestCase, ExcInfo]] = []

def _get_failfast(self) -> bool:
return len(self.targets) == 2
Expand All @@ -2245,11 +2246,43 @@ def startTest(self, test: unittest.TestCase) -> None:
self.startTestRun()
self.status(test_id=test.id(), test_status="inprogress", timestamp=self._now())
self._tags = TagContext(self._tags)
self._subtest_failures = []

def stopTest(self, test: unittest.TestCase) -> None:
# NOTE: In Python 3.12.1 skipped tests may not call startTest()
if self._tags is not None:
self._tags = self._tags.parent
# If any subtests failed, emit the failure details and a fail status
# for the parent test. When all subtests pass, unittest calls
# addSuccess for the parent, so we don't need to emit a status here.
if self._subtest_failures:
test_id = test.id()
now = self._now()
# Emit traceback for each failed subtest as a file attachment
for subtest, err in self._subtest_failures:
# Use subtest description to create unique attachment name
subtest_desc = subtest._subDescription() # type: ignore[attr-defined]
attachment_name = f"traceback {subtest_desc}"
content = TracebackContent(err, subtest)
mime_type = repr(content.content_type)
file_bytes = b"".join(content.iter_bytes())
self.status(
file_name=attachment_name,
file_bytes=file_bytes,
eof=True,
mime_type=mime_type,
test_id=test_id,
timestamp=now,
)
# Emit final fail status for the parent test
self.status(
test_id=test_id,
test_status="fail",
test_tags=self.current_tags,
timestamp=now,
)
# Clear subtest tracking
self._subtest_failures = []

def addError(
self,
Expand Down Expand Up @@ -2345,6 +2378,26 @@ def addSuccess(
) -> None:
self._convert(test, None, details, "success")

def addSubTest(
self,
test: unittest.TestCase,
subtest: unittest.TestCase,
err: ExcInfo | None,
) -> None:
"""Handle a subtest result.

This is called by unittest when a subtest completes. Subtest failures
are collected and reported as attachments to the parent test, so the
test count reflects only the parent test (matching unittest behavior).

:param test: The original test case.
:param subtest: The subtest instance (has its own id() method).
:param err: None if successful, exc_info tuple if failed.
"""
# Collect failures to report when the parent test completes
if err is not None:
self._subtest_failures.append((subtest, err))

def _check_args(self, err: ExcInfo | None, details: DetailsDict | None) -> None:
param_count = 0
if err is not None:
Expand Down Expand Up @@ -2700,6 +2753,14 @@ def addUnexpectedSuccess(
) -> None:
self.decorated.addUnexpectedSuccess(test, details=details)

def addSubTest(
self,
test: unittest.TestCase,
subtest: unittest.TestCase,
err: ExcInfo | None,
) -> None:
self.decorated.addSubTest(test, subtest, err) # type: ignore[arg-type]

def addDuration(self, test: unittest.TestCase, duration: float) -> None:
self.decorated.addDuration(test, duration)

Expand Down