forked from Minki/linux
6ebf5866f2
The ultimate goal is to create minimal isolated test binaries; in the meantime we are using UML to provide the infrastructure to run tests, so define an abstract way to configure and run tests that allow us to change the context in which tests are built without affecting the user. This also makes pretty and dynamic error reporting, and a lot of other nice features easier. kunit_config.py: - parse .config and Kconfig files. kunit_kernel.py: provides helper functions to: - configure the kernel using kunitconfig. - build the kernel with the appropriate configuration. - provide function to invoke the kernel and stream the output back. kunit_parser.py: parses raw logs returned out by kunit_kernel and displays them in a user friendly way. test_data/*: samples of test data for testing kunit.py, kunit_config.py, etc. Signed-off-by: Felix Guo <felixguoxiuping@gmail.com> Signed-off-by: Brendan Higgins <brendanhiggins@google.com> Reviewed-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Reviewed-by: Logan Gunthorpe <logang@deltatee.com> Reviewed-by: Stephen Boyd <sboyd@kernel.org> Signed-off-by: Shuah Khan <skhan@linuxfoundation.org>
311 lines
8.6 KiB
Python
311 lines
8.6 KiB
Python
# SPDX-License-Identifier: GPL-2.0
|
|
#
|
|
# Parses test results from a kernel dmesg log.
|
|
#
|
|
# Copyright (C) 2019, Google LLC.
|
|
# Author: Felix Guo <felixguoxiuping@gmail.com>
|
|
# Author: Brendan Higgins <brendanhiggins@google.com>
|
|
|
|
import re
|
|
|
|
from collections import namedtuple
|
|
from datetime import datetime
|
|
from enum import Enum, auto
|
|
from functools import reduce
|
|
from typing import List
|
|
|
|
TestResult = namedtuple('TestResult', ['status','suites','log'])
|
|
|
|
class TestSuite(object):
|
|
def __init__(self):
|
|
self.status = None
|
|
self.name = None
|
|
self.cases = []
|
|
|
|
def __str__(self):
|
|
return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
class TestCase(object):
|
|
def __init__(self):
|
|
self.status = None
|
|
self.name = ''
|
|
self.log = []
|
|
|
|
def __str__(self):
|
|
return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
class TestStatus(Enum):
|
|
SUCCESS = auto()
|
|
FAILURE = auto()
|
|
TEST_CRASHED = auto()
|
|
NO_TESTS = auto()
|
|
|
|
kunit_start_re = re.compile(r'^TAP version [0-9]+$')
|
|
kunit_end_re = re.compile('List of all partitions:')
|
|
|
|
def isolate_kunit_output(kernel_output):
|
|
started = False
|
|
for line in kernel_output:
|
|
if kunit_start_re.match(line):
|
|
started = True
|
|
yield line
|
|
elif kunit_end_re.match(line):
|
|
break
|
|
elif started:
|
|
yield line
|
|
|
|
def raw_output(kernel_output):
|
|
for line in kernel_output:
|
|
print(line)
|
|
|
|
DIVIDER = '=' * 60
|
|
|
|
RESET = '\033[0;0m'
|
|
|
|
def red(text):
|
|
return '\033[1;31m' + text + RESET
|
|
|
|
def yellow(text):
|
|
return '\033[1;33m' + text + RESET
|
|
|
|
def green(text):
|
|
return '\033[1;32m' + text + RESET
|
|
|
|
def print_with_timestamp(message):
|
|
print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
|
|
|
|
def format_suite_divider(message):
|
|
return '======== ' + message + ' ========'
|
|
|
|
def print_suite_divider(message):
|
|
print_with_timestamp(DIVIDER)
|
|
print_with_timestamp(format_suite_divider(message))
|
|
|
|
def print_log(log):
|
|
for m in log:
|
|
print_with_timestamp(m)
|
|
|
|
TAP_ENTRIES = re.compile(r'^(TAP|\t?ok|\t?not ok|\t?[0-9]+\.\.[0-9]+|\t?#).*$')
|
|
|
|
def consume_non_diagnositic(lines: List[str]) -> None:
|
|
while lines and not TAP_ENTRIES.match(lines[0]):
|
|
lines.pop(0)
|
|
|
|
def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
|
|
while lines and not TAP_ENTRIES.match(lines[0]):
|
|
test_case.log.append(lines[0])
|
|
lines.pop(0)
|
|
|
|
OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
|
|
|
|
OK_NOT_OK_SUBTEST = re.compile(r'^\t(ok|not ok) [0-9]+ - (.*)$')
|
|
|
|
OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) [0-9]+ - (.*)$')
|
|
|
|
def parse_ok_not_ok_test_case(lines: List[str],
|
|
test_case: TestCase,
|
|
expecting_test_case: bool) -> bool:
|
|
save_non_diagnositic(lines, test_case)
|
|
if not lines:
|
|
if expecting_test_case:
|
|
test_case.status = TestStatus.TEST_CRASHED
|
|
return True
|
|
else:
|
|
return False
|
|
line = lines[0]
|
|
match = OK_NOT_OK_SUBTEST.match(line)
|
|
if match:
|
|
test_case.log.append(lines.pop(0))
|
|
test_case.name = match.group(2)
|
|
if test_case.status == TestStatus.TEST_CRASHED:
|
|
return True
|
|
if match.group(1) == 'ok':
|
|
test_case.status = TestStatus.SUCCESS
|
|
else:
|
|
test_case.status = TestStatus.FAILURE
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
SUBTEST_DIAGNOSTIC = re.compile(r'^\t# .*?: (.*)$')
|
|
DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'
|
|
|
|
def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
|
|
save_non_diagnositic(lines, test_case)
|
|
if not lines:
|
|
return False
|
|
line = lines[0]
|
|
match = SUBTEST_DIAGNOSTIC.match(line)
|
|
if match:
|
|
test_case.log.append(lines.pop(0))
|
|
if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
|
|
test_case.status = TestStatus.TEST_CRASHED
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def parse_test_case(lines: List[str], expecting_test_case: bool) -> TestCase:
|
|
test_case = TestCase()
|
|
save_non_diagnositic(lines, test_case)
|
|
while parse_diagnostic(lines, test_case):
|
|
pass
|
|
if parse_ok_not_ok_test_case(lines, test_case, expecting_test_case):
|
|
return test_case
|
|
else:
|
|
return None
|
|
|
|
SUBTEST_HEADER = re.compile(r'^\t# Subtest: (.*)$')
|
|
|
|
def parse_subtest_header(lines: List[str]) -> str:
|
|
consume_non_diagnositic(lines)
|
|
if not lines:
|
|
return None
|
|
match = SUBTEST_HEADER.match(lines[0])
|
|
if match:
|
|
lines.pop(0)
|
|
return match.group(1)
|
|
else:
|
|
return None
|
|
|
|
SUBTEST_PLAN = re.compile(r'\t[0-9]+\.\.([0-9]+)')
|
|
|
|
def parse_subtest_plan(lines: List[str]) -> int:
|
|
consume_non_diagnositic(lines)
|
|
match = SUBTEST_PLAN.match(lines[0])
|
|
if match:
|
|
lines.pop(0)
|
|
return int(match.group(1))
|
|
else:
|
|
return None
|
|
|
|
def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
|
|
if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
|
|
return TestStatus.TEST_CRASHED
|
|
elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
|
|
return TestStatus.FAILURE
|
|
elif left != TestStatus.SUCCESS:
|
|
return left
|
|
elif right != TestStatus.SUCCESS:
|
|
return right
|
|
else:
|
|
return TestStatus.SUCCESS
|
|
|
|
def parse_ok_not_ok_test_suite(lines: List[str], test_suite: TestSuite) -> bool:
|
|
consume_non_diagnositic(lines)
|
|
if not lines:
|
|
test_suite.status = TestStatus.TEST_CRASHED
|
|
return False
|
|
line = lines[0]
|
|
match = OK_NOT_OK_MODULE.match(line)
|
|
if match:
|
|
lines.pop(0)
|
|
if match.group(1) == 'ok':
|
|
test_suite.status = TestStatus.SUCCESS
|
|
else:
|
|
test_suite.status = TestStatus.FAILURE
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def bubble_up_errors(to_status, status_container_list) -> TestStatus:
|
|
status_list = map(to_status, status_container_list)
|
|
return reduce(max_status, status_list, TestStatus.SUCCESS)
|
|
|
|
def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
|
|
max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
|
|
return max_status(max_test_case_status, test_suite.status)
|
|
|
|
def parse_test_suite(lines: List[str]) -> TestSuite:
|
|
if not lines:
|
|
return None
|
|
consume_non_diagnositic(lines)
|
|
test_suite = TestSuite()
|
|
test_suite.status = TestStatus.SUCCESS
|
|
name = parse_subtest_header(lines)
|
|
if not name:
|
|
return None
|
|
test_suite.name = name
|
|
expected_test_case_num = parse_subtest_plan(lines)
|
|
if not expected_test_case_num:
|
|
return None
|
|
test_case = parse_test_case(lines, expected_test_case_num > 0)
|
|
expected_test_case_num -= 1
|
|
while test_case:
|
|
test_suite.cases.append(test_case)
|
|
test_case = parse_test_case(lines, expected_test_case_num > 0)
|
|
expected_test_case_num -= 1
|
|
if parse_ok_not_ok_test_suite(lines, test_suite):
|
|
test_suite.status = bubble_up_test_case_errors(test_suite)
|
|
return test_suite
|
|
elif not lines:
|
|
print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
|
|
return test_suite
|
|
else:
|
|
print('failed to parse end of suite' + lines[0])
|
|
return None
|
|
|
|
TAP_HEADER = re.compile(r'^TAP version 14$')
|
|
|
|
def parse_tap_header(lines: List[str]) -> bool:
|
|
consume_non_diagnositic(lines)
|
|
if TAP_HEADER.match(lines[0]):
|
|
lines.pop(0)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
|
|
return bubble_up_errors(lambda x: x.status, test_suite_list)
|
|
|
|
def parse_test_result(lines: List[str]) -> TestResult:
|
|
if not lines:
|
|
return TestResult(TestStatus.NO_TESTS, [], lines)
|
|
consume_non_diagnositic(lines)
|
|
if not parse_tap_header(lines):
|
|
return None
|
|
test_suites = []
|
|
test_suite = parse_test_suite(lines)
|
|
while test_suite:
|
|
test_suites.append(test_suite)
|
|
test_suite = parse_test_suite(lines)
|
|
return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
|
|
|
|
def parse_run_tests(kernel_output) -> TestResult:
|
|
total_tests = 0
|
|
failed_tests = 0
|
|
crashed_tests = 0
|
|
test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
|
|
for test_suite in test_result.suites:
|
|
if test_suite.status == TestStatus.SUCCESS:
|
|
print_suite_divider(green('[PASSED] ') + test_suite.name)
|
|
elif test_suite.status == TestStatus.TEST_CRASHED:
|
|
print_suite_divider(red('[CRASHED] ' + test_suite.name))
|
|
else:
|
|
print_suite_divider(red('[FAILED] ') + test_suite.name)
|
|
for test_case in test_suite.cases:
|
|
total_tests += 1
|
|
if test_case.status == TestStatus.SUCCESS:
|
|
print_with_timestamp(green('[PASSED] ') + test_case.name)
|
|
elif test_case.status == TestStatus.TEST_CRASHED:
|
|
crashed_tests += 1
|
|
print_with_timestamp(red('[CRASHED] ' + test_case.name))
|
|
print_log(map(yellow, test_case.log))
|
|
print_with_timestamp('')
|
|
else:
|
|
failed_tests += 1
|
|
print_with_timestamp(red('[FAILED] ') + test_case.name)
|
|
print_log(map(yellow, test_case.log))
|
|
print_with_timestamp('')
|
|
print_with_timestamp(DIVIDER)
|
|
fmt = green if test_result.status == TestStatus.SUCCESS else red
|
|
print_with_timestamp(
|
|
fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
|
|
(total_tests, failed_tests, crashed_tests)))
|
|
return test_result
|