aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/pybootchartgui/pybootchartgui/tests/parser_test.py')
-rw-r--r--scripts/pybootchartgui/pybootchartgui/tests/parser_test.py82
1 files changed, 47 insertions, 35 deletions
diff --git a/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
index 574c2c7a2b..00fb3bf797 100644
--- a/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
+++ b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
@@ -4,90 +4,102 @@ import unittest
sys.path.insert(0, os.getcwd())
-import parsing
+import pybootchartgui.parsing as parsing
+import pybootchartgui.main as main
debug = False
def floatEq(f1, f2):
return math.fabs(f1-f2) < 0.00001
+bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
+parser = main._mk_options_parser()
+options, args = parser.parse_args(['--q', bootchart_dir])
+writer = main._mk_writer(options)
+
class TestBCParser(unittest.TestCase):
def setUp(self):
self.name = "My first unittest"
- self.rootdir = '../examples/1'
+ self.rootdir = bootchart_dir
def mk_fname(self,f):
return os.path.join(self.rootdir, f)
def testParseHeader(self):
- state = parsing.parse_file(parsing.ParserState(), self.mk_fname('header'))
+ trace = parsing.Trace(writer, args, options)
+ state = parsing.parse_file(writer, trace, self.mk_fname('header'))
self.assertEqual(6, len(state.headers))
self.assertEqual(2, parsing.get_num_cpus(state.headers))
def test_parseTimedBlocks(self):
- state = parsing.parse_file(parsing.ParserState(), self.mk_fname('proc_diskstats.log'))
+ trace = parsing.Trace(writer, args, options)
+ state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log'))
self.assertEqual(141, len(state.disk_stats))
def testParseProcPsLog(self):
- state = parsing.parse_file(parsing.ParserState(), self.mk_fname('proc_ps.log'))
+ trace = parsing.Trace(writer, args, options)
+ state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
samples = state.ps_stats
- processes = samples.process_list
- sorted_processes = sorted(processes, key=lambda p: p.pid )
-
- for index, line in enumerate(open(self.mk_fname('extract2.proc_ps.log'))):
+ processes = samples.process_map
+ sorted_processes = [processes[k] for k in sorted(processes.keys())]
+
+ ps_data = open(self.mk_fname('extract2.proc_ps.log'))
+ for index, line in enumerate(ps_data):
tokens = line.split();
process = sorted_processes[index]
- if debug:
- print tokens[0:4]
- print process.pid, process.cmd, process.ppid, len(process.samples)
- print '-------------------'
-
- self.assertEqual(tokens[0], str(process.pid))
+ if debug:
+ print(tokens[0:4])
+ print(process.pid / 1000, process.cmd, process.ppid, len(process.samples))
+ print('-------------------')
+
+ self.assertEqual(tokens[0], str(process.pid // 1000))
self.assertEqual(tokens[1], str(process.cmd))
- self.assertEqual(tokens[2], str(process.ppid))
+ self.assertEqual(tokens[2], str(process.ppid // 1000))
self.assertEqual(tokens[3], str(len(process.samples)))
-
+ ps_data.close()
def testparseProcDiskStatLog(self):
- state_with_headers = parsing.parse_file(parsing.ParserState(), self.mk_fname('header'))
+ trace = parsing.Trace(writer, args, options)
+ state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header'))
state_with_headers.headers['system.cpu'] = 'xxx (2)'
- samples = parsing.parse_file(state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats
+ samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats
self.assertEqual(141, len(samples))
-
- for index, line in enumerate(open(self.mk_fname('extract.proc_diskstats.log'))):
+
+ diskstats_data = open(self.mk_fname('extract.proc_diskstats.log'))
+ for index, line in enumerate(diskstats_data):
tokens = line.split('\t')
sample = samples[index]
if debug:
- print line.rstrip(),
- print sample
- print '-------------------'
+ print(line.rstrip())
+ print(sample)
+ print('-------------------')
self.assertEqual(tokens[0], str(sample.time))
self.assert_(floatEq(float(tokens[1]), sample.read))
self.assert_(floatEq(float(tokens[2]), sample.write))
self.assert_(floatEq(float(tokens[3]), sample.util))
+ diskstats_data.close()
def testparseProcStatLog(self):
- samples = parsing.parse_file(parsing.ParserState(), self.mk_fname('proc_stat.log')).cpu_stats
+ trace = parsing.Trace(writer, args, options)
+ samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats
self.assertEqual(141, len(samples))
-
- for index, line in enumerate(open(self.mk_fname('extract.proc_stat.log'))):
+
+ stat_data = open(self.mk_fname('extract.proc_stat.log'))
+ for index, line in enumerate(stat_data):
tokens = line.split('\t')
sample = samples[index]
if debug:
- print line.rstrip()
- print sample
- print '-------------------'
+ print(line.rstrip())
+ print(sample)
+ print('-------------------')
self.assert_(floatEq(float(tokens[0]), sample.time))
self.assert_(floatEq(float(tokens[1]), sample.user))
self.assert_(floatEq(float(tokens[2]), sample.sys))
self.assert_(floatEq(float(tokens[3]), sample.io))
-
- def testParseLogDir(self):
- res = parsing.parse([self.rootdir], False)
- self.assertEqual(4, len(res))
-
+ stat_data.close()
+
if __name__ == '__main__':
unittest.main()