From 0dedc1c573ddc4e87475eb03c64555cd54a72e92 Mon Sep 17 00:00:00 2001 From: Trevor Gamblin Date: Mon, 7 Jun 2021 09:40:20 -0400 Subject: [PATCH] Fix imports for tests Signed-off-by: Trevor Gamblin --- tests/test_asyncio.py | 2 +- tests/test_asyncio_context_vars.py | 2 +- tests/test_functionality.py | 2 +- tests/test_gevent.py | 2 +- tests/test_hooks.py | 2 +- tests/test_tags.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -2,7 +2,7 @@ import unittest import yappi import asyncio import threading -from utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io +from .utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io @asyncio.coroutine --- a/tests/test_asyncio_context_vars.py +++ b/tests/test_asyncio_context_vars.py @@ -5,7 +5,7 @@ import contextvars import functools import time import os -import utils +import tests.utils as utils import yappi async_context_id = contextvars.ContextVar('async_context_id') --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -1,1916 +1,1916 @@ -import os -import sys -import time -import threading -import unittest -import yappi -import _yappi -import utils -import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6 -import subprocess - -_counter = 0 - - -class BasicUsage(utils.YappiUnitTestCase): - - def test_callback_function_int_return_overflow(self): - # this test is just here to check if any errors are generated, as the err - # is printed in C side, I did not include it here. THere are ways to test - # this deterministically, I did not bother - import ctypes - - def _unsigned_overflow_margin(): - return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1 - - def foo(): - pass - - #with utils.captured_output() as (out, err): - yappi.set_context_id_callback(_unsigned_overflow_margin) - yappi.set_tag_callback(_unsigned_overflow_margin) - yappi.start() - foo() - - def test_issue60(self): - - def foo(): - buf = bytearray() - buf += b't' * 200 - view = memoryview(buf)[10:] - view = view.tobytes() - del buf[:10] # this throws exception - return view - - yappi.start(builtins=True) - foo() - self.assertTrue( - len( - yappi.get_func_stats( - filter_callback=lambda x: yappi. - func_matches(x, [memoryview.tobytes]) - ) - ) > 0 - ) - yappi.stop() - - def test_issue54(self): - - def _tag_cbk(): - global _counter - _counter += 1 - return _counter - - def a(): - pass - - def b(): - pass - - yappi.set_tag_callback(_tag_cbk) - yappi.start() - a() - a() - a() - yappi.stop() - stats = yappi.get_func_stats() - self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given - stats = yappi.get_func_stats(tag=1) - - for i in range(1, 3): - stats = yappi.get_func_stats(tag=i) - stats = yappi.get_func_stats( - tag=i, filter_callback=lambda x: yappi.func_matches(x, [a]) - ) - - stat = stats.pop() - self.assertEqual(stat.ncall, 1) - - yappi.set_tag_callback(None) - yappi.clear_stats() - yappi.start() - b() - b() - stats = yappi.get_func_stats() - self.assertEqual(len(stats), 1) - stat = stats.pop() - self.assertEqual(stat.ncall, 2) - - def test_filter(self): - - def a(): - pass - - def b(): - a() - - def c(): - b() - - _TCOUNT = 5 - - ts = [] - yappi.start() - for i in range(_TCOUNT): - t = threading.Thread(target=c) - t.start() - ts.append(t) - - for t in ts: - t.join() - - yappi.stop() - - ctx_ids = [] - for tstat in yappi.get_thread_stats(): - if tstat.name == '_MainThread': - main_ctx_id = tstat.id - else: - ctx_ids.append(tstat.id) - - fstats = yappi.get_func_stats(filter={"ctx_id": 9}) - self.assertTrue(fstats.empty()) - fstats = yappi.get_func_stats( - filter={ - "ctx_id": main_ctx_id, - "name": "c" - } - ) # main thread - self.assertTrue(fstats.empty()) - - for i in ctx_ids: - fstats = yappi.get_func_stats( - filter={ - "ctx_id": i, - "name": "a", - "ncall": 1 - } - ) - self.assertEqual(fstats.pop().ncall, 1) - fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"}) - self.assertEqual(fstats.pop().ncall, 1) - fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"}) - self.assertEqual(fstats.pop().ncall, 1) - - yappi.clear_stats() - yappi.start(builtins=True) - time.sleep(0.1) - yappi.stop() - fstats = yappi.get_func_stats(filter={"module": "time"}) - self.assertEqual(len(fstats), 1) - - # invalid filters` - self.assertRaises( - Exception, yappi.get_func_stats, filter={'tag': "sss"} - ) - self.assertRaises( - Exception, yappi.get_func_stats, filter={'ctx_id': "None"} - ) - - def test_filter_callback(self): - - def a(): - time.sleep(0.1) - - def b(): - a() - - def c(): - pass - - def d(): - pass - - yappi.set_clock_type("wall") - yappi.start(builtins=True) - a() - b() - c() - d() - stats = yappi.get_func_stats( - filter_callback=lambda x: yappi.func_matches(x, [a, b]) - ) - #stats.print_all() - r1 = ''' - tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175 - tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197 - ''' - self.assert_traces_almost_equal(r1, stats) - self.assertEqual(len(stats), 2) - stats = yappi.get_func_stats( - filter_callback=lambda x: yappi. - module_matches(x, [sys.modules[__name__]]) - ) - r1 = ''' - tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065 - tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011 - tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002 - tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001 - ''' - self.assert_traces_almost_equal(r1, stats) - self.assertEqual(len(stats), 4) - - stats = yappi.get_func_stats( - filter_callback=lambda x: yappi.func_matches(x, [time.sleep]) - ) - self.assertEqual(len(stats), 1) - r1 = ''' - time.sleep 2 0.206804 0.220000 0.103402 - ''' - self.assert_traces_almost_equal(r1, stats) - - def test_print_formatting(self): - - def a(): - pass - - def b(): - a() - - func_cols = { - 1: ("name", 48), - 0: ("ncall", 5), - 2: ("tsub", 8), - } - thread_cols = { - 1: ("name", 48), - 0: ("ttot", 8), - } - - yappi.start() - a() - b() - yappi.stop() - fs = yappi.get_func_stats() - cs = fs[1].children - ts = yappi.get_thread_stats() - #fs.print_all(out=sys.stderr, columns={1:("name", 70), }) - #cs.print_all(out=sys.stderr, columns=func_cols) - #ts.print_all(out=sys.stderr, columns=thread_cols) - #cs.print_all(out=sys.stderr, columns={}) - - self.assertRaises( - yappi.YappiError, fs.print_all, columns={1: ("namee", 9)} - ) - self.assertRaises( - yappi.YappiError, cs.print_all, columns={1: ("dd", 0)} - ) - self.assertRaises( - yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)} - ) - - def test_get_clock(self): - yappi.set_clock_type('cpu') - self.assertEqual('cpu', yappi.get_clock_type()) - clock_info = yappi.get_clock_info() - self.assertTrue('api' in clock_info) - self.assertTrue('resolution' in clock_info) - - yappi.set_clock_type('wall') - self.assertEqual('wall', yappi.get_clock_type()) - - t0 = yappi.get_clock_time() - time.sleep(0.1) - duration = yappi.get_clock_time() - t0 - self.assertTrue(0.05 < duration < 0.3) - - def test_profile_decorator(self): - - def aggregate(func, stats): - fname = "tests/%s.profile" % (func.__name__) - try: - stats.add(fname) - except IOError: - pass - stats.save(fname) - raise Exception("messing around") - - @yappi.profile(return_callback=aggregate) - def a(x, y): - if x + y == 25: - raise Exception("") - return x + y - - def b(): - pass - - try: - os.remove( - "tests/a.profile" - ) # remove the one from prev test, if available - except: - pass - - # global profile is on to mess things up - yappi.start() - b() - - # assert functionality and call function at same time - try: - self.assertEqual(a(1, 2), 3) - except: - pass - try: - self.assertEqual(a(2, 5), 7) - except: - pass - try: - a(4, 21) - except: - pass - stats = yappi.get_func_stats().add("tests/a.profile") - fsa = utils.find_stat_by_name(stats, 'a') - self.assertEqual(fsa.ncall, 3) - self.assertEqual(len(stats), 1) # b() should be cleared out. - - @yappi.profile(return_callback=aggregate) - def count_down_rec(n): - if n == 0: - return - count_down_rec(n - 1) - - try: - os.remove( - "tests/count_down_rec.profile" - ) # remove the one from prev test, if available - except: - pass - - try: - count_down_rec(4) - except: - pass - try: - count_down_rec(3) - except: - pass - - stats = yappi.YFuncStats("tests/count_down_rec.profile") - fsrec = utils.find_stat_by_name(stats, 'count_down_rec') - self.assertEqual(fsrec.ncall, 9) - self.assertEqual(fsrec.nactualcall, 2) - - def test_strip_dirs(self): - - def a(): - pass - - stats = utils.run_and_get_func_stats(a, ) - stats.strip_dirs() - fsa = utils.find_stat_by_name(stats, "a") - self.assertEqual(fsa.module, os.path.basename(fsa.module)) - - @unittest.skipIf(os.name == "nt", "do not run on Windows") - def test_run_as_script(self): - import re - p = subprocess.Popen( - ['yappi', os.path.join('./tests', 'run_as_script.py')], - stdout=subprocess.PIPE - ) - out, err = p.communicate() - self.assertEqual(p.returncode, 0) - func_stats, thread_stats = re.split( - b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out - ) - self.assertTrue(b'FancyThread' in thread_stats) - - def test_yappi_overhead(self): - LOOP_COUNT = 100000 - - def a(): - pass - - def b(): - for i in range(LOOP_COUNT): - a() - - t0 = time.time() - yappi.start() - b() - yappi.stop() - time_with_yappi = time.time() - t0 - t0 = time.time() - b() - time_without_yappi = time.time() - t0 - if time_without_yappi == 0: - time_without_yappi = 0.000001 - - # in latest v0.82, I calculated this as close to "7.0" in my machine. - # however, %83 of this overhead is coming from tickcount(). The other %17 - # seems to have been evenly distributed to the internal bookkeeping - # structures/algorithms which seems acceptable. Note that our test only - # tests one function being profiled at-a-time in a short interval. - # profiling high number of functions in a small time - # is a different beast, (which is pretty unlikely in most applications) - # So as a conclusion: I cannot see any optimization window for Yappi that - # is worth implementing as we will only optimize %17 of the time. - sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \ - (time_with_yappi / time_without_yappi)) - - def test_clear_stats_while_running(self): - - def a(): - pass - - yappi.start() - a() - yappi.clear_stats() - a() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'a') - self.assertEqual(fsa.ncall, 1) - - def test_generator(self): - - def _gen(n): - while (n > 0): - yield n - n -= 1 - - yappi.start() - for x in _gen(5): - pass - self.assertTrue( - yappi.convert2pstats(yappi.get_func_stats()) is not None - ) - - def test_slice_child_stats_and_strip_dirs(self): - - def b(): - for i in range(10000000): - pass - - def a(): - b() - - yappi.start(builtins=True) - a() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - self.assertTrue(fsa.children[0:1] is not None) - prev_afullname = fsa.full_name - prev_bchildfullname = fsa.children[fsb].full_name - stats.strip_dirs() - self.assertTrue(len(prev_afullname) > len(fsa.full_name)) - self.assertTrue( - len(prev_bchildfullname) > len(fsa.children[fsb].full_name) - ) - - def test_children_stat_functions(self): - _timings = {"a_1": 5, "b_1": 3, "c_1": 1} - _yappi._set_test_timings(_timings) - - def b(): - pass - - def c(): - pass - - def a(): - b() - c() - - yappi.start() - a() - b() # non-child call - c() # non-child call - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'a') - childs_of_a = fsa.children.get().sort("tavg", "desc") - prev_item = None - for item in childs_of_a: - if prev_item: - self.assertTrue(prev_item.tavg > item.tavg) - prev_item = item - childs_of_a.sort("name", "desc") - prev_item = None - for item in childs_of_a: - if prev_item: - self.assertTrue(prev_item.name > item.name) - prev_item = item - childs_of_a.clear() - self.assertTrue(childs_of_a.empty()) - - def test_no_stats_different_clock_type_load(self): - - def a(): - pass - - yappi.start() - a() - yappi.stop() - yappi.get_func_stats().save("tests/ystats1.ys") - yappi.clear_stats() - yappi.set_clock_type("WALL") - yappi.start() - yappi.stop() - stats = yappi.get_func_stats().add("tests/ystats1.ys") - fsa = utils.find_stat_by_name(stats, 'a') - self.assertTrue(fsa is not None) - - def test_subsequent_profile(self): - _timings = {"a_1": 1, "b_1": 1} - _yappi._set_test_timings(_timings) - - def a(): - pass - - def b(): - pass - - yappi.start() - a() - yappi.stop() - yappi.start() - b() - yappi.stop() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - self.assertTrue(fsa is not None) - self.assertTrue(fsb is not None) - self.assertEqual(fsa.ttot, 1) - self.assertEqual(fsb.ttot, 1) - - def test_lambda(self): - f = lambda: time.sleep(0.3) - yappi.set_clock_type("wall") - yappi.start() - f() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, '') - self.assertTrue(fsa.ttot > 0.1) - - def test_module_stress(self): - self.assertEqual(yappi.is_running(), False) - - yappi.start() - yappi.clear_stats() - self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") - - yappi.stop() - yappi.clear_stats() - yappi.set_clock_type("cpu") - self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy") - self.assertEqual(yappi.is_running(), False) - yappi.clear_stats() - yappi.clear_stats() - - def test_stat_sorting(self): - _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} - _yappi._set_test_timings(_timings) - - self._ncall = 1 - - def a(): - b() - - def b(): - if self._ncall == 2: - return - self._ncall += 1 - a() - - stats = utils.run_and_get_func_stats(a) - stats = stats.sort("totaltime", "desc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.ttot >= stat.ttot) - prev_stat = stat - stats = stats.sort("totaltime", "asc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.ttot <= stat.ttot) - prev_stat = stat - stats = stats.sort("avgtime", "asc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.tavg <= stat.tavg) - prev_stat = stat - stats = stats.sort("name", "asc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.name <= stat.name) - prev_stat = stat - stats = stats.sort("subtime", "asc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.tsub <= stat.tsub) - prev_stat = stat - - self.assertRaises( - yappi.YappiError, stats.sort, "invalid_func_sorttype_arg" - ) - self.assertRaises( - yappi.YappiError, stats.sort, "totaltime", - "invalid_func_sortorder_arg" - ) - - def test_start_flags(self): - self.assertEqual(_yappi._get_start_flags(), None) - yappi.start() - - def a(): - pass - - a() - self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) - self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) - self.assertEqual(len(yappi.get_thread_stats()), 1) - - def test_builtin_profiling(self): - - def a(): - time.sleep(0.4) # is a builtin function - - yappi.set_clock_type('wall') - - yappi.start(builtins=True) - a() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'sleep') - self.assertTrue(fsa is not None) - self.assertTrue(fsa.ttot > 0.3) - yappi.stop() - yappi.clear_stats() - - def a(): - pass - - yappi.start() - t = threading.Thread(target=a) - t.start() - t.join() - stats = yappi.get_func_stats() - - def test_singlethread_profiling(self): - yappi.set_clock_type('wall') - - def a(): - time.sleep(0.2) - - class Worker1(threading.Thread): - - def a(self): - time.sleep(0.3) - - def run(self): - self.a() - - yappi.start(profile_threads=False) - - c = Worker1() - c.start() - c.join() - a() - stats = yappi.get_func_stats() - fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') - fsa2 = utils.find_stat_by_name(stats, 'a') - self.assertTrue(fsa1 is None) - self.assertTrue(fsa2 is not None) - self.assertTrue(fsa2.ttot > 0.1) - - def test_run(self): - - def profiled(): - pass - - yappi.clear_stats() - try: - with yappi.run(): - profiled() - stats = yappi.get_func_stats() - finally: - yappi.clear_stats() - - self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) - - def test_run_recursive(self): - - def profiled(): - pass - - def not_profiled(): - pass - - yappi.clear_stats() - try: - with yappi.run(): - with yappi.run(): - profiled() - # Profiling stopped here - not_profiled() - stats = yappi.get_func_stats() - finally: - yappi.clear_stats() - - self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) - self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled')) - - -class StatSaveScenarios(utils.YappiUnitTestCase): - - def test_pstats_conversion(self): - - def pstat_id(fs): - return (fs.module, fs.lineno, fs.name) - - def a(): - d() - - def b(): - d() - - def c(): - pass - - def d(): - pass - - _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2} - _yappi._set_test_timings(_timings) - stats = utils.run_and_get_func_stats(a, ) - stats.strip_dirs() - stats.save("tests/a1.pstats", type="pstat") - fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a")) - fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d")) - yappi.clear_stats() - _yappi._set_test_timings(_timings) - stats = utils.run_and_get_func_stats(a, ) - stats.strip_dirs() - stats.save("tests/a2.pstats", type="pstat") - yappi.clear_stats() - _yappi._set_test_timings(_timings) - stats = utils.run_and_get_func_stats(b, ) - stats.strip_dirs() - stats.save("tests/b1.pstats", type="pstat") - fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b")) - yappi.clear_stats() - _yappi._set_test_timings(_timings) - stats = utils.run_and_get_func_stats(c, ) - stats.strip_dirs() - stats.save("tests/c1.pstats", type="pstat") - fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c")) - - # merge saved stats and check pstats values are correct - import pstats - p = pstats.Stats( - 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats', - 'tests/c1.pstats' - ) - p.strip_dirs() - # ct = ttot, tt = tsub - (cc, nc, tt, ct, callers) = p.stats[fsa_pid] - self.assertEqual(cc, nc, 2) - self.assertEqual(tt, 20) - self.assertEqual(ct, 24) - (cc, nc, tt, ct, callers) = p.stats[fsd_pid] - self.assertEqual(cc, nc, 3) - self.assertEqual(tt, 6) - self.assertEqual(ct, 6) - self.assertEqual(len(callers), 2) - (cc, nc, tt, ct) = callers[fsa_pid] - self.assertEqual(cc, nc, 2) - self.assertEqual(tt, 4) - self.assertEqual(ct, 4) - (cc, nc, tt, ct) = callers[fsb_pid] - self.assertEqual(cc, nc, 1) - self.assertEqual(tt, 2) - self.assertEqual(ct, 2) - - def test_merge_stats(self): - _timings = { - "a_1": 15, - "b_1": 14, - "c_1": 12, - "d_1": 10, - "e_1": 9, - "f_1": 7, - "g_1": 6, - "h_1": 5, - "i_1": 1 - } - _yappi._set_test_timings(_timings) - - def a(): - b() - - def b(): - c() - - def c(): - d() - - def d(): - e() - - def e(): - f() - - def f(): - g() - - def g(): - h() - - def h(): - i() - - def i(): - pass - - yappi.start() - a() - a() - yappi.stop() - stats = yappi.get_func_stats() - self.assertRaises( - NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE" - ) - stats.save("tests/ystats2.ys") - yappi.clear_stats() - _yappi._set_test_timings(_timings) - yappi.start() - a() - stats = yappi.get_func_stats().add("tests/ystats2.ys") - fsa = utils.find_stat_by_name(stats, "a") - fsb = utils.find_stat_by_name(stats, "b") - fsc = utils.find_stat_by_name(stats, "c") - fsd = utils.find_stat_by_name(stats, "d") - fse = utils.find_stat_by_name(stats, "e") - fsf = utils.find_stat_by_name(stats, "f") - fsg = utils.find_stat_by_name(stats, "g") - fsh = utils.find_stat_by_name(stats, "h") - fsi = utils.find_stat_by_name(stats, "i") - self.assertEqual(fsa.ttot, 45) - self.assertEqual(fsa.ncall, 3) - self.assertEqual(fsa.nactualcall, 3) - self.assertEqual(fsa.tsub, 3) - self.assertEqual(fsa.children[fsb].ttot, fsb.ttot) - self.assertEqual(fsa.children[fsb].tsub, fsb.tsub) - self.assertEqual(fsb.children[fsc].ttot, fsc.ttot) - self.assertEqual(fsb.children[fsc].tsub, fsc.tsub) - self.assertEqual(fsc.tsub, 6) - self.assertEqual(fsc.children[fsd].ttot, fsd.ttot) - self.assertEqual(fsc.children[fsd].tsub, fsd.tsub) - self.assertEqual(fsd.children[fse].ttot, fse.ttot) - self.assertEqual(fsd.children[fse].tsub, fse.tsub) - self.assertEqual(fse.children[fsf].ttot, fsf.ttot) - self.assertEqual(fse.children[fsf].tsub, fsf.tsub) - self.assertEqual(fsf.children[fsg].ttot, fsg.ttot) - self.assertEqual(fsf.children[fsg].tsub, fsg.tsub) - self.assertEqual(fsg.ttot, 18) - self.assertEqual(fsg.tsub, 3) - self.assertEqual(fsg.children[fsh].ttot, fsh.ttot) - self.assertEqual(fsg.children[fsh].tsub, fsh.tsub) - self.assertEqual(fsh.ttot, 15) - self.assertEqual(fsh.tsub, 12) - self.assertEqual(fsh.tavg, 5) - self.assertEqual(fsh.children[fsi].ttot, fsi.ttot) - self.assertEqual(fsh.children[fsi].tsub, fsi.tsub) - #stats.debug_print() - - def test_merge_multithreaded_stats(self): - import _yappi - timings = {"a_1": 2, "b_1": 1} - _yappi._set_test_timings(timings) - - def a(): - pass - - def b(): - pass - - yappi.start() - t = threading.Thread(target=a) - t.start() - t.join() - t = threading.Thread(target=b) - t.start() - t.join() - yappi.get_func_stats().save("tests/ystats1.ys") - yappi.clear_stats() - _yappi._set_test_timings(timings) - self.assertEqual(len(yappi.get_func_stats()), 0) - self.assertEqual(len(yappi.get_thread_stats()), 1) - t = threading.Thread(target=a) - t.start() - t.join() - - self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) - self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) - yappi.get_func_stats().save("tests/ystats2.ys") - - stats = yappi.YFuncStats([ - "tests/ystats1.ys", - "tests/ystats2.ys", - ]) - fsa = utils.find_stat_by_name(stats, "a") - fsb = utils.find_stat_by_name(stats, "b") - self.assertEqual(fsa.ncall, 2) - self.assertEqual(fsb.ncall, 1) - self.assertEqual(fsa.tsub, fsa.ttot, 4) - self.assertEqual(fsb.tsub, fsb.ttot, 1) - - def test_merge_load_different_clock_types(self): - yappi.start(builtins=True) - - def a(): - b() - - def b(): - c() - - def c(): - pass - - t = threading.Thread(target=a) - t.start() - t.join() - yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys") - yappi.stop() - yappi.clear_stats() - yappi.start(builtins=False) - t = threading.Thread(target=a) - t.start() - t.join() - yappi.get_func_stats().save("tests/ystats2.ys") - yappi.stop() - self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") - yappi.clear_stats() - yappi.set_clock_type("wall") - yappi.start() - t = threading.Thread(target=a) - t.start() - t.join() - yappi.get_func_stats().save("tests/ystats3.ys") - self.assertRaises( - yappi.YappiError, - yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys" - ) - stats = yappi.YFuncStats(["tests/ystats1.ys", - "tests/ystats2.ys"]).sort("name") - fsa = utils.find_stat_by_name(stats, "a") - fsb = utils.find_stat_by_name(stats, "b") - fsc = utils.find_stat_by_name(stats, "c") - self.assertEqual(fsa.ncall, 2) - self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall) - - def test_merge_aabab_aabbc(self): - _timings = { - "a_1": 15, - "a_2": 14, - "b_1": 12, - "a_3": 10, - "b_2": 9, - "c_1": 4 - } - _yappi._set_test_timings(_timings) - - def a(): - if self._ncall == 1: - self._ncall += 1 - a() - elif self._ncall == 5: - self._ncall += 1 - a() - else: - b() - - def b(): - if self._ncall == 2: - self._ncall += 1 - a() - elif self._ncall == 6: - self._ncall += 1 - b() - elif self._ncall == 7: - c() - else: - return - - def c(): - pass - - self._ncall = 1 - stats = utils.run_and_get_func_stats(a, ) - stats.save("tests/ystats1.ys") - yappi.clear_stats() - _yappi._set_test_timings(_timings) - #stats.print_all() - - self._ncall = 5 - stats = utils.run_and_get_func_stats(a, ) - stats.save("tests/ystats2.ys") - - #stats.print_all() - - def a(): # same name but another function(code object) - pass - - yappi.start() - a() - stats = yappi.get_func_stats().add( - ["tests/ystats1.ys", "tests/ystats2.ys"] - ) - #stats.print_all() - self.assertEqual(len(stats), 4) - - fsa = None - for stat in stats: - if stat.name == "a" and stat.ttot == 45: - fsa = stat - break - self.assertTrue(fsa is not None) - - self.assertEqual(fsa.ncall, 7) - self.assertEqual(fsa.nactualcall, 3) - self.assertEqual(fsa.ttot, 45) - self.assertEqual(fsa.tsub, 10) - fsb = utils.find_stat_by_name(stats, "b") - fsc = utils.find_stat_by_name(stats, "c") - self.assertEqual(fsb.ncall, 6) - self.assertEqual(fsb.nactualcall, 3) - self.assertEqual(fsb.ttot, 36) - self.assertEqual(fsb.tsub, 27) - self.assertEqual(fsb.tavg, 6) - self.assertEqual(fsc.ttot, 8) - self.assertEqual(fsc.tsub, 8) - self.assertEqual(fsc.tavg, 4) - self.assertEqual(fsc.nactualcall, fsc.ncall, 2) - - -class MultithreadedScenarios(utils.YappiUnitTestCase): - - def test_issue_32(self): - ''' - Start yappi from different thread and we get Internal Error(15) as - the current_ctx_id() called while enumerating the threads in start() - and as it does not swap to the enumerated ThreadState* the THreadState_GetDict() - returns wrong object and thus sets an invalid id for the _ctx structure. - - When this issue happens multiple Threads have same tid as the internal ts_ptr - will be same for different contexts. So, let's see if that happens - ''' - - def foo(): - time.sleep(0.2) - - def bar(): - time.sleep(0.1) - - def thread_func(): - yappi.set_clock_type("wall") - yappi.start() - - bar() - - t = threading.Thread(target=thread_func) - t.start() - t.join() - - foo() - - yappi.stop() - - thread_ids = set() - for tstat in yappi.get_thread_stats(): - self.assertTrue(tstat.tid not in thread_ids) - thread_ids.add(tstat.tid) - - def test_subsequent_profile(self): - WORKER_COUNT = 5 - - def a(): - pass - - def b(): - pass - - def c(): - pass - - _timings = { - "a_1": 3, - "b_1": 2, - "c_1": 1, - } - - yappi.start() - - def g(): - pass - - g() - yappi.stop() - yappi.clear_stats() - _yappi._set_test_timings(_timings) - yappi.start() - - _dummy = [] - for i in range(WORKER_COUNT): - t = threading.Thread(target=a) - t.start() - t.join() - for i in range(WORKER_COUNT): - t = threading.Thread(target=b) - t.start() - _dummy.append(t) - t.join() - for i in range(WORKER_COUNT): - t = threading.Thread(target=a) - t.start() - t.join() - for i in range(WORKER_COUNT): - t = threading.Thread(target=c) - t.start() - t.join() - yappi.stop() - yappi.start() - - def f(): - pass - - f() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - fsc = utils.find_stat_by_name(stats, 'c') - self.assertEqual(fsa.ncall, 10) - self.assertEqual(fsb.ncall, 5) - self.assertEqual(fsc.ncall, 5) - self.assertEqual(fsa.ttot, fsa.tsub, 30) - self.assertEqual(fsb.ttot, fsb.tsub, 10) - self.assertEqual(fsc.ttot, fsc.tsub, 5) - - # MACOSx optimizes by only creating one worker thread - self.assertTrue(len(yappi.get_thread_stats()) >= 2) - - def test_basic(self): - yappi.set_clock_type('wall') - - def dummy(): - pass - - def a(): - time.sleep(0.2) - - class Worker1(threading.Thread): - - def a(self): - time.sleep(0.3) - - def run(self): - self.a() - - yappi.start(builtins=False, profile_threads=True) - - c = Worker1() - c.start() - c.join() - a() - stats = yappi.get_func_stats() - fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') - fsa2 = utils.find_stat_by_name(stats, 'a') - self.assertTrue(fsa1 is not None) - self.assertTrue(fsa2 is not None) - self.assertTrue(fsa1.ttot > 0.2) - self.assertTrue(fsa2.ttot > 0.1) - tstats = yappi.get_thread_stats() - self.assertEqual(len(tstats), 2) - tsa = utils.find_stat_by_name(tstats, 'Worker1') - tsm = utils.find_stat_by_name(tstats, '_MainThread') - dummy() # call dummy to force ctx name to be retrieved again. - self.assertTrue(tsa is not None) - # TODO: I put dummy() to fix below, remove the comments after a while. - self.assertTrue( # FIX: I see this fails sometimes? - tsm is not None, - 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats)))) - - def test_ctx_stats(self): - from threading import Thread - DUMMY_WORKER_COUNT = 5 - yappi.start() - - class DummyThread(Thread): - pass - - def dummy(): - pass - - def dummy_worker(): - pass - - for i in range(DUMMY_WORKER_COUNT): - t = DummyThread(target=dummy_worker) - t.start() - t.join() - yappi.stop() - stats = yappi.get_thread_stats() - tsa = utils.find_stat_by_name(stats, "DummyThread") - self.assertTrue(tsa is not None) - yappi.clear_stats() - time.sleep(1.0) - _timings = { - "a_1": 6, - "b_1": 5, - "c_1": 3, - "d_1": 1, - "a_2": 4, - "b_2": 3, - "c_2": 2, - "d_2": 1 - } - _yappi._set_test_timings(_timings) - - class Thread1(Thread): - pass - - class Thread2(Thread): - pass - - def a(): - b() - - def b(): - c() - - def c(): - d() - - def d(): - time.sleep(0.6) - - yappi.set_clock_type("wall") - yappi.start() - t1 = Thread1(target=a) - t1.start() - t2 = Thread2(target=a) - t2.start() - t1.join() - t2.join() - stats = yappi.get_thread_stats() - - # the fist clear_stats clears the context table? - tsa = utils.find_stat_by_name(stats, "DummyThread") - self.assertTrue(tsa is None) - - tst1 = utils.find_stat_by_name(stats, "Thread1") - tst2 = utils.find_stat_by_name(stats, "Thread2") - tsmain = utils.find_stat_by_name(stats, "_MainThread") - dummy() # call dummy to force ctx name to be retrieved again. - self.assertTrue(len(stats) == 3) - self.assertTrue(tst1 is not None) - self.assertTrue(tst2 is not None) - # TODO: I put dummy() to fix below, remove the comments after a while. - self.assertTrue( # FIX: I see this fails sometimes - tsmain is not None, - 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(stats)))) - self.assertTrue(1.0 > tst2.ttot >= 0.5) - self.assertTrue(1.0 > tst1.ttot >= 0.5) - - # test sorting of the ctx stats - stats = stats.sort("totaltime", "desc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.ttot >= stat.ttot) - prev_stat = stat - stats = stats.sort("totaltime", "asc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.ttot <= stat.ttot) - prev_stat = stat - stats = stats.sort("schedcount", "desc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.sched_count >= stat.sched_count) - prev_stat = stat - stats = stats.sort("name", "desc") - prev_stat = None - for stat in stats: - if prev_stat: - self.assertTrue(prev_stat.name.lower() >= stat.name.lower()) - prev_stat = stat - self.assertRaises( - yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg" - ) - self.assertRaises( - yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg" - ) - - def test_ctx_stats_cpu(self): - - def get_thread_name(): - try: - return threading.current_thread().name - except AttributeError: - return "Anonymous" - - def burn_cpu(sec): - t0 = yappi.get_clock_time() - elapsed = 0 - while (elapsed < sec): - for _ in range(1000): - pass - elapsed = yappi.get_clock_time() - t0 - - def test(): - - ts = [] - for i in (0.01, 0.05, 0.1): - t = threading.Thread(target=burn_cpu, args=(i, )) - t.name = "burn_cpu-%s" % str(i) - t.start() - ts.append(t) - for t in ts: - t.join() - - yappi.set_clock_type("cpu") - yappi.set_context_name_callback(get_thread_name) - - yappi.start() - - test() - - yappi.stop() - - tstats = yappi.get_thread_stats() - r1 = ''' - burn_cpu-0.1 3 123145356058624 0.100105 8 - burn_cpu-0.05 2 123145361313792 0.050149 8 - burn_cpu-0.01 1 123145356058624 0.010127 2 - MainThread 0 4321620864 0.001632 6 - ''' - self.assert_ctx_stats_almost_equal(r1, tstats) - - def test_producer_consumer_with_queues(self): - # we currently just stress yappi, no functionality test is done here. - yappi.start() - if utils.is_py3x(): - from queue import Queue - else: - from Queue import Queue - from threading import Thread - WORKER_THREAD_COUNT = 50 - WORK_ITEM_COUNT = 2000 - - def worker(): - while True: - item = q.get() - # do the work with item - q.task_done() - - q = Queue() - for i in range(WORKER_THREAD_COUNT): - t = Thread(target=worker) - t.daemon = True - t.start() - - for item in range(WORK_ITEM_COUNT): - q.put(item) - q.join() # block until all tasks are done - #yappi.get_func_stats().sort("callcount").print_all() - yappi.stop() - - def test_temporary_lock_waiting(self): - yappi.start() - _lock = threading.Lock() - - def worker(): - _lock.acquire() - try: - time.sleep(1.0) - finally: - _lock.release() - - t1 = threading.Thread(target=worker) - t2 = threading.Thread(target=worker) - t1.start() - t2.start() - t1.join() - t2.join() - #yappi.get_func_stats().sort("callcount").print_all() - yappi.stop() - - @unittest.skipIf(os.name != "posix", "requires Posix compliant OS") - def test_signals_with_blocking_calls(self): - import signal, os, time - - # just to verify if signal is handled correctly and stats/yappi are not corrupted. - def handler(signum, frame): - raise Exception("Signal handler executed!") - - yappi.start() - signal.signal(signal.SIGALRM, handler) - signal.alarm(1) - self.assertRaises(Exception, time.sleep, 2) - stats = yappi.get_func_stats() - fsh = utils.find_stat_by_name(stats, "handler") - self.assertTrue(fsh is not None) - - @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2") - def test_concurrent_futures(self): - yappi.start() - from concurrent.futures import ThreadPoolExecutor - with ThreadPoolExecutor(max_workers=5) as executor: - f = executor.submit(pow, 5, 2) - self.assertEqual(f.result(), 25) - time.sleep(1.0) - yappi.stop() - - @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2") - def test_barrier(self): - yappi.start() - b = threading.Barrier(2, timeout=1) - - def worker(): - try: - b.wait() - except threading.BrokenBarrierError: - pass - except Exception: - raise Exception("BrokenBarrierError not raised") - - t1 = threading.Thread(target=worker) - t1.start() - #b.wait() - t1.join() - yappi.stop() - - -class NonRecursiveFunctions(utils.YappiUnitTestCase): - - def test_abcd(self): - _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1} - _yappi._set_test_timings(_timings) - - def a(): - b() - - def b(): - c() - - def c(): - d() - - def d(): - pass - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - fsc = utils.find_stat_by_name(stats, 'c') - fsd = utils.find_stat_by_name(stats, 'd') - cfsab = fsa.children[fsb] - cfsbc = fsb.children[fsc] - cfscd = fsc.children[fsd] - - self.assertEqual(fsa.ttot, 6) - self.assertEqual(fsa.tsub, 1) - self.assertEqual(fsb.ttot, 5) - self.assertEqual(fsb.tsub, 2) - self.assertEqual(fsc.ttot, 3) - self.assertEqual(fsc.tsub, 2) - self.assertEqual(fsd.ttot, 1) - self.assertEqual(fsd.tsub, 1) - self.assertEqual(cfsab.ttot, 5) - self.assertEqual(cfsab.tsub, 2) - self.assertEqual(cfsbc.ttot, 3) - self.assertEqual(cfsbc.tsub, 2) - self.assertEqual(cfscd.ttot, 1) - self.assertEqual(cfscd.tsub, 1) - - def test_stop_in_middle(self): - _timings = {"a_1": 6, "b_1": 4} - _yappi._set_test_timings(_timings) - - def a(): - b() - yappi.stop() - - def b(): - time.sleep(0.2) - - yappi.start() - a() - stats = yappi.get_func_stats() - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - - self.assertEqual(fsa.ncall, 1) - self.assertEqual(fsa.nactualcall, 0) - self.assertEqual(fsa.ttot, 0) # no call_leave called - self.assertEqual(fsa.tsub, 0) # no call_leave called - self.assertEqual(fsb.ttot, 4) - - -class RecursiveFunctions(utils.YappiUnitTestCase): - - def test_fibonacci(self): - - def fib(n): - if n > 1: - return fib(n - 1) + fib(n - 2) - else: - return n - - stats = utils.run_and_get_func_stats(fib, 22) - fs = utils.find_stat_by_name(stats, 'fib') - self.assertEqual(fs.ncall, 57313) - self.assertEqual(fs.ttot, fs.tsub) - - def test_abcadc(self): - _timings = { - "a_1": 20, - "b_1": 19, - "c_1": 17, - "a_2": 13, - "d_1": 12, - "c_2": 10, - "a_3": 5 - } - _yappi._set_test_timings(_timings) - - def a(n): - if n == 3: - return - if n == 1 + 1: - d(n) - else: - b(n) - - def b(n): - c(n) - - def c(n): - a(n + 1) - - def d(n): - c(n) - - stats = utils.run_and_get_func_stats(a, 1) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - fsc = utils.find_stat_by_name(stats, 'c') - fsd = utils.find_stat_by_name(stats, 'd') - self.assertEqual(fsa.ncall, 3) - self.assertEqual(fsa.nactualcall, 1) - self.assertEqual(fsa.ttot, 20) - self.assertEqual(fsa.tsub, 7) - self.assertEqual(fsb.ttot, 19) - self.assertEqual(fsb.tsub, 2) - self.assertEqual(fsc.ttot, 17) - self.assertEqual(fsc.tsub, 9) - self.assertEqual(fsd.ttot, 12) - self.assertEqual(fsd.tsub, 2) - cfsca = fsc.children[fsa] - self.assertEqual(cfsca.nactualcall, 0) - self.assertEqual(cfsca.ncall, 2) - self.assertEqual(cfsca.ttot, 13) - self.assertEqual(cfsca.tsub, 6) - - def test_aaaa(self): - _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2} - _yappi._set_test_timings(_timings) - - def d(n): - if n == 3: - return - d(n + 1) - - stats = utils.run_and_get_func_stats(d, 0) - fsd = utils.find_stat_by_name(stats, 'd') - self.assertEqual(fsd.ncall, 4) - self.assertEqual(fsd.nactualcall, 1) - self.assertEqual(fsd.ttot, 9) - self.assertEqual(fsd.tsub, 9) - cfsdd = fsd.children[fsd] - self.assertEqual(cfsdd.ttot, 7) - self.assertEqual(cfsdd.tsub, 7) - self.assertEqual(cfsdd.ncall, 3) - self.assertEqual(cfsdd.nactualcall, 0) - - def test_abcabc(self): - _timings = { - "a_1": 20, - "b_1": 19, - "c_1": 17, - "a_2": 13, - "b_2": 11, - "c_2": 9, - "a_3": 6 - } - _yappi._set_test_timings(_timings) - - def a(n): - if n == 3: - return - else: - b(n) - - def b(n): - c(n) - - def c(n): - a(n + 1) - - stats = utils.run_and_get_func_stats(a, 1) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - fsc = utils.find_stat_by_name(stats, 'c') - self.assertEqual(fsa.ncall, 3) - self.assertEqual(fsa.nactualcall, 1) - self.assertEqual(fsa.ttot, 20) - self.assertEqual(fsa.tsub, 9) - self.assertEqual(fsb.ttot, 19) - self.assertEqual(fsb.tsub, 4) - self.assertEqual(fsc.ttot, 17) - self.assertEqual(fsc.tsub, 7) - cfsab = fsa.children[fsb] - cfsbc = fsb.children[fsc] - cfsca = fsc.children[fsa] - self.assertEqual(cfsab.ttot, 19) - self.assertEqual(cfsab.tsub, 4) - self.assertEqual(cfsbc.ttot, 17) - self.assertEqual(cfsbc.tsub, 7) - self.assertEqual(cfsca.ttot, 13) - self.assertEqual(cfsca.tsub, 8) - - def test_abcbca(self): - _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1} - _yappi._set_test_timings(_timings) - self._ncall = 1 - - def a(): - if self._ncall == 1: - b() - else: - return - - def b(): - c() - - def c(): - if self._ncall == 1: - self._ncall += 1 - b() - else: - a() - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - fsc = utils.find_stat_by_name(stats, 'c') - cfsab = fsa.children[fsb] - cfsbc = fsb.children[fsc] - cfsca = fsc.children[fsa] - self.assertEqual(fsa.ttot, 10) - self.assertEqual(fsa.tsub, 2) - self.assertEqual(fsb.ttot, 9) - self.assertEqual(fsb.tsub, 4) - self.assertEqual(fsc.ttot, 7) - self.assertEqual(fsc.tsub, 4) - self.assertEqual(cfsab.ttot, 9) - self.assertEqual(cfsab.tsub, 2) - self.assertEqual(cfsbc.ttot, 7) - self.assertEqual(cfsbc.tsub, 4) - self.assertEqual(cfsca.ttot, 1) - self.assertEqual(cfsca.tsub, 1) - self.assertEqual(cfsca.ncall, 1) - self.assertEqual(cfsca.nactualcall, 0) - - def test_aabccb(self): - _timings = { - "a_1": 13, - "a_2": 11, - "b_1": 9, - "c_1": 5, - "c_2": 3, - "b_2": 1 - } - _yappi._set_test_timings(_timings) - self._ncall = 1 - - def a(): - if self._ncall == 1: - self._ncall += 1 - a() - else: - b() - - def b(): - if self._ncall == 3: - return - else: - c() - - def c(): - if self._ncall == 2: - self._ncall += 1 - c() - else: - b() - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - fsc = utils.find_stat_by_name(stats, 'c') - cfsaa = fsa.children[fsa.index] - cfsab = fsa.children[fsb] - cfsbc = fsb.children[fsc.full_name] - cfscc = fsc.children[fsc] - cfscb = fsc.children[fsb] - self.assertEqual(fsb.ttot, 9) - self.assertEqual(fsb.tsub, 5) - self.assertEqual(cfsbc.ttot, 5) - self.assertEqual(cfsbc.tsub, 2) - self.assertEqual(fsa.ttot, 13) - self.assertEqual(fsa.tsub, 4) - self.assertEqual(cfsab.ttot, 9) - self.assertEqual(cfsab.tsub, 4) - self.assertEqual(cfsaa.ttot, 11) - self.assertEqual(cfsaa.tsub, 2) - self.assertEqual(fsc.ttot, 5) - self.assertEqual(fsc.tsub, 4) - - def test_abaa(self): - _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5} - _yappi._set_test_timings(_timings) - - self._ncall = 1 - - def a(): - if self._ncall == 1: - b() - elif self._ncall == 2: - self._ncall += 1 - a() - else: - return - - def b(): - self._ncall += 1 - a() - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - cfsaa = fsa.children[fsa] - cfsba = fsb.children[fsa] - self.assertEqual(fsb.ttot, 10) - self.assertEqual(fsb.tsub, 1) - self.assertEqual(fsa.ttot, 13) - self.assertEqual(fsa.tsub, 12) - self.assertEqual(cfsaa.ttot, 5) - self.assertEqual(cfsaa.tsub, 5) - self.assertEqual(cfsba.ttot, 9) - self.assertEqual(cfsba.tsub, 4) - - def test_aabb(self): - _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5} - _yappi._set_test_timings(_timings) - - self._ncall = 1 - - def a(): - if self._ncall == 1: - self._ncall += 1 - a() - elif self._ncall == 2: - b() - else: - return - - def b(): - if self._ncall == 2: - self._ncall += 1 - b() - else: - return - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - cfsaa = fsa.children[fsa] - cfsab = fsa.children[fsb] - cfsbb = fsb.children[fsb] - self.assertEqual(fsa.ttot, 13) - self.assertEqual(fsa.tsub, 4) - self.assertEqual(fsb.ttot, 9) - self.assertEqual(fsb.tsub, 9) - self.assertEqual(cfsaa.ttot, 10) - self.assertEqual(cfsaa.tsub, 1) - self.assertEqual(cfsab.ttot, 9) - self.assertEqual(cfsab.tsub, 4) - self.assertEqual(cfsbb.ttot, 5) - self.assertEqual(cfsbb.tsub, 5) - - def test_abbb(self): - _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1} - _yappi._set_test_timings(_timings) - - self._ncall = 1 - - def a(): - if self._ncall == 1: - b() - - def b(): - if self._ncall == 3: - return - self._ncall += 1 - b() - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - cfsab = fsa.children[fsb] - cfsbb = fsb.children[fsb] - self.assertEqual(fsa.ttot, 13) - self.assertEqual(fsa.tsub, 3) - self.assertEqual(fsb.ttot, 10) - self.assertEqual(fsb.tsub, 10) - self.assertEqual(fsb.ncall, 3) - self.assertEqual(fsb.nactualcall, 1) - self.assertEqual(cfsab.ttot, 10) - self.assertEqual(cfsab.tsub, 4) - self.assertEqual(cfsbb.ttot, 6) - self.assertEqual(cfsbb.tsub, 6) - self.assertEqual(cfsbb.nactualcall, 0) - self.assertEqual(cfsbb.ncall, 2) - - def test_aaab(self): - _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1} - _yappi._set_test_timings(_timings) - - self._ncall = 1 - - def a(): - if self._ncall == 3: - b() - return - self._ncall += 1 - a() - - def b(): - return - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - cfsaa = fsa.children[fsa] - cfsab = fsa.children[fsb] - self.assertEqual(fsa.ttot, 13) - self.assertEqual(fsa.tsub, 12) - self.assertEqual(fsb.ttot, 1) - self.assertEqual(fsb.tsub, 1) - self.assertEqual(cfsaa.ttot, 10) - self.assertEqual(cfsaa.tsub, 9) - self.assertEqual(cfsab.ttot, 1) - self.assertEqual(cfsab.tsub, 1) - - def test_abab(self): - _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} - _yappi._set_test_timings(_timings) - - self._ncall = 1 - - def a(): - b() - - def b(): - if self._ncall == 2: - return - self._ncall += 1 - a() - - stats = utils.run_and_get_func_stats(a) - fsa = utils.find_stat_by_name(stats, 'a') - fsb = utils.find_stat_by_name(stats, 'b') - cfsab = fsa.children[fsb] - cfsba = fsb.children[fsa] - self.assertEqual(fsa.ttot, 13) - self.assertEqual(fsa.tsub, 8) - self.assertEqual(fsb.ttot, 10) - self.assertEqual(fsb.tsub, 5) - self.assertEqual(cfsab.ttot, 10) - self.assertEqual(cfsab.tsub, 5) - self.assertEqual(cfsab.ncall, 2) - self.assertEqual(cfsab.nactualcall, 1) - self.assertEqual(cfsba.ttot, 6) - self.assertEqual(cfsba.tsub, 5) - - -if __name__ == '__main__': - # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script'] - # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile'] - unittest.main() +import os +import sys +import time +import threading +import unittest +import yappi +import _yappi +import tests.utils as utils +import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6 +import subprocess + +_counter = 0 + + +class BasicUsage(utils.YappiUnitTestCase): + + def test_callback_function_int_return_overflow(self): + # this test is just here to check if any errors are generated, as the err + # is printed in C side, I did not include it here. THere are ways to test + # this deterministically, I did not bother + import ctypes + + def _unsigned_overflow_margin(): + return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1 + + def foo(): + pass + + #with utils.captured_output() as (out, err): + yappi.set_context_id_callback(_unsigned_overflow_margin) + yappi.set_tag_callback(_unsigned_overflow_margin) + yappi.start() + foo() + + def test_issue60(self): + + def foo(): + buf = bytearray() + buf += b't' * 200 + view = memoryview(buf)[10:] + view = view.tobytes() + del buf[:10] # this throws exception + return view + + yappi.start(builtins=True) + foo() + self.assertTrue( + len( + yappi.get_func_stats( + filter_callback=lambda x: yappi. + func_matches(x, [memoryview.tobytes]) + ) + ) > 0 + ) + yappi.stop() + + def test_issue54(self): + + def _tag_cbk(): + global _counter + _counter += 1 + return _counter + + def a(): + pass + + def b(): + pass + + yappi.set_tag_callback(_tag_cbk) + yappi.start() + a() + a() + a() + yappi.stop() + stats = yappi.get_func_stats() + self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given + stats = yappi.get_func_stats(tag=1) + + for i in range(1, 3): + stats = yappi.get_func_stats(tag=i) + stats = yappi.get_func_stats( + tag=i, filter_callback=lambda x: yappi.func_matches(x, [a]) + ) + + stat = stats.pop() + self.assertEqual(stat.ncall, 1) + + yappi.set_tag_callback(None) + yappi.clear_stats() + yappi.start() + b() + b() + stats = yappi.get_func_stats() + self.assertEqual(len(stats), 1) + stat = stats.pop() + self.assertEqual(stat.ncall, 2) + + def test_filter(self): + + def a(): + pass + + def b(): + a() + + def c(): + b() + + _TCOUNT = 5 + + ts = [] + yappi.start() + for i in range(_TCOUNT): + t = threading.Thread(target=c) + t.start() + ts.append(t) + + for t in ts: + t.join() + + yappi.stop() + + ctx_ids = [] + for tstat in yappi.get_thread_stats(): + if tstat.name == '_MainThread': + main_ctx_id = tstat.id + else: + ctx_ids.append(tstat.id) + + fstats = yappi.get_func_stats(filter={"ctx_id": 9}) + self.assertTrue(fstats.empty()) + fstats = yappi.get_func_stats( + filter={ + "ctx_id": main_ctx_id, + "name": "c" + } + ) # main thread + self.assertTrue(fstats.empty()) + + for i in ctx_ids: + fstats = yappi.get_func_stats( + filter={ + "ctx_id": i, + "name": "a", + "ncall": 1 + } + ) + self.assertEqual(fstats.pop().ncall, 1) + fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"}) + self.assertEqual(fstats.pop().ncall, 1) + fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"}) + self.assertEqual(fstats.pop().ncall, 1) + + yappi.clear_stats() + yappi.start(builtins=True) + time.sleep(0.1) + yappi.stop() + fstats = yappi.get_func_stats(filter={"module": "time"}) + self.assertEqual(len(fstats), 1) + + # invalid filters` + self.assertRaises( + Exception, yappi.get_func_stats, filter={'tag': "sss"} + ) + self.assertRaises( + Exception, yappi.get_func_stats, filter={'ctx_id': "None"} + ) + + def test_filter_callback(self): + + def a(): + time.sleep(0.1) + + def b(): + a() + + def c(): + pass + + def d(): + pass + + yappi.set_clock_type("wall") + yappi.start(builtins=True) + a() + b() + c() + d() + stats = yappi.get_func_stats( + filter_callback=lambda x: yappi.func_matches(x, [a, b]) + ) + #stats.print_all() + r1 = ''' + tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175 + tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197 + ''' + self.assert_traces_almost_equal(r1, stats) + self.assertEqual(len(stats), 2) + stats = yappi.get_func_stats( + filter_callback=lambda x: yappi. + module_matches(x, [sys.modules[__name__]]) + ) + r1 = ''' + tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065 + tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011 + tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002 + tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001 + ''' + self.assert_traces_almost_equal(r1, stats) + self.assertEqual(len(stats), 4) + + stats = yappi.get_func_stats( + filter_callback=lambda x: yappi.func_matches(x, [time.sleep]) + ) + self.assertEqual(len(stats), 1) + r1 = ''' + time.sleep 2 0.206804 0.220000 0.103402 + ''' + self.assert_traces_almost_equal(r1, stats) + + def test_print_formatting(self): + + def a(): + pass + + def b(): + a() + + func_cols = { + 1: ("name", 48), + 0: ("ncall", 5), + 2: ("tsub", 8), + } + thread_cols = { + 1: ("name", 48), + 0: ("ttot", 8), + } + + yappi.start() + a() + b() + yappi.stop() + fs = yappi.get_func_stats() + cs = fs[1].children + ts = yappi.get_thread_stats() + #fs.print_all(out=sys.stderr, columns={1:("name", 70), }) + #cs.print_all(out=sys.stderr, columns=func_cols) + #ts.print_all(out=sys.stderr, columns=thread_cols) + #cs.print_all(out=sys.stderr, columns={}) + + self.assertRaises( + yappi.YappiError, fs.print_all, columns={1: ("namee", 9)} + ) + self.assertRaises( + yappi.YappiError, cs.print_all, columns={1: ("dd", 0)} + ) + self.assertRaises( + yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)} + ) + + def test_get_clock(self): + yappi.set_clock_type('cpu') + self.assertEqual('cpu', yappi.get_clock_type()) + clock_info = yappi.get_clock_info() + self.assertTrue('api' in clock_info) + self.assertTrue('resolution' in clock_info) + + yappi.set_clock_type('wall') + self.assertEqual('wall', yappi.get_clock_type()) + + t0 = yappi.get_clock_time() + time.sleep(0.1) + duration = yappi.get_clock_time() - t0 + self.assertTrue(0.05 < duration < 0.3) + + def test_profile_decorator(self): + + def aggregate(func, stats): + fname = "tests/%s.profile" % (func.__name__) + try: + stats.add(fname) + except IOError: + pass + stats.save(fname) + raise Exception("messing around") + + @yappi.profile(return_callback=aggregate) + def a(x, y): + if x + y == 25: + raise Exception("") + return x + y + + def b(): + pass + + try: + os.remove( + "tests/a.profile" + ) # remove the one from prev test, if available + except: + pass + + # global profile is on to mess things up + yappi.start() + b() + + # assert functionality and call function at same time + try: + self.assertEqual(a(1, 2), 3) + except: + pass + try: + self.assertEqual(a(2, 5), 7) + except: + pass + try: + a(4, 21) + except: + pass + stats = yappi.get_func_stats().add("tests/a.profile") + fsa = utils.find_stat_by_name(stats, 'a') + self.assertEqual(fsa.ncall, 3) + self.assertEqual(len(stats), 1) # b() should be cleared out. + + @yappi.profile(return_callback=aggregate) + def count_down_rec(n): + if n == 0: + return + count_down_rec(n - 1) + + try: + os.remove( + "tests/count_down_rec.profile" + ) # remove the one from prev test, if available + except: + pass + + try: + count_down_rec(4) + except: + pass + try: + count_down_rec(3) + except: + pass + + stats = yappi.YFuncStats("tests/count_down_rec.profile") + fsrec = utils.find_stat_by_name(stats, 'count_down_rec') + self.assertEqual(fsrec.ncall, 9) + self.assertEqual(fsrec.nactualcall, 2) + + def test_strip_dirs(self): + + def a(): + pass + + stats = utils.run_and_get_func_stats(a, ) + stats.strip_dirs() + fsa = utils.find_stat_by_name(stats, "a") + self.assertEqual(fsa.module, os.path.basename(fsa.module)) + + @unittest.skipIf(os.name == "nt", "do not run on Windows") + def test_run_as_script(self): + import re + p = subprocess.Popen( + ['yappi', os.path.join('./tests', 'run_as_script.py')], + stdout=subprocess.PIPE + ) + out, err = p.communicate() + self.assertEqual(p.returncode, 0) + func_stats, thread_stats = re.split( + b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out + ) + self.assertTrue(b'FancyThread' in thread_stats) + + def test_yappi_overhead(self): + LOOP_COUNT = 100000 + + def a(): + pass + + def b(): + for i in range(LOOP_COUNT): + a() + + t0 = time.time() + yappi.start() + b() + yappi.stop() + time_with_yappi = time.time() - t0 + t0 = time.time() + b() + time_without_yappi = time.time() - t0 + if time_without_yappi == 0: + time_without_yappi = 0.000001 + + # in latest v0.82, I calculated this as close to "7.0" in my machine. + # however, %83 of this overhead is coming from tickcount(). The other %17 + # seems to have been evenly distributed to the internal bookkeeping + # structures/algorithms which seems acceptable. Note that our test only + # tests one function being profiled at-a-time in a short interval. + # profiling high number of functions in a small time + # is a different beast, (which is pretty unlikely in most applications) + # So as a conclusion: I cannot see any optimization window for Yappi that + # is worth implementing as we will only optimize %17 of the time. + sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \ + (time_with_yappi / time_without_yappi)) + + def test_clear_stats_while_running(self): + + def a(): + pass + + yappi.start() + a() + yappi.clear_stats() + a() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'a') + self.assertEqual(fsa.ncall, 1) + + def test_generator(self): + + def _gen(n): + while (n > 0): + yield n + n -= 1 + + yappi.start() + for x in _gen(5): + pass + self.assertTrue( + yappi.convert2pstats(yappi.get_func_stats()) is not None + ) + + def test_slice_child_stats_and_strip_dirs(self): + + def b(): + for i in range(10000000): + pass + + def a(): + b() + + yappi.start(builtins=True) + a() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + self.assertTrue(fsa.children[0:1] is not None) + prev_afullname = fsa.full_name + prev_bchildfullname = fsa.children[fsb].full_name + stats.strip_dirs() + self.assertTrue(len(prev_afullname) > len(fsa.full_name)) + self.assertTrue( + len(prev_bchildfullname) > len(fsa.children[fsb].full_name) + ) + + def test_children_stat_functions(self): + _timings = {"a_1": 5, "b_1": 3, "c_1": 1} + _yappi._set_test_timings(_timings) + + def b(): + pass + + def c(): + pass + + def a(): + b() + c() + + yappi.start() + a() + b() # non-child call + c() # non-child call + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'a') + childs_of_a = fsa.children.get().sort("tavg", "desc") + prev_item = None + for item in childs_of_a: + if prev_item: + self.assertTrue(prev_item.tavg > item.tavg) + prev_item = item + childs_of_a.sort("name", "desc") + prev_item = None + for item in childs_of_a: + if prev_item: + self.assertTrue(prev_item.name > item.name) + prev_item = item + childs_of_a.clear() + self.assertTrue(childs_of_a.empty()) + + def test_no_stats_different_clock_type_load(self): + + def a(): + pass + + yappi.start() + a() + yappi.stop() + yappi.get_func_stats().save("tests/ystats1.ys") + yappi.clear_stats() + yappi.set_clock_type("WALL") + yappi.start() + yappi.stop() + stats = yappi.get_func_stats().add("tests/ystats1.ys") + fsa = utils.find_stat_by_name(stats, 'a') + self.assertTrue(fsa is not None) + + def test_subsequent_profile(self): + _timings = {"a_1": 1, "b_1": 1} + _yappi._set_test_timings(_timings) + + def a(): + pass + + def b(): + pass + + yappi.start() + a() + yappi.stop() + yappi.start() + b() + yappi.stop() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + self.assertTrue(fsa is not None) + self.assertTrue(fsb is not None) + self.assertEqual(fsa.ttot, 1) + self.assertEqual(fsb.ttot, 1) + + def test_lambda(self): + f = lambda: time.sleep(0.3) + yappi.set_clock_type("wall") + yappi.start() + f() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, '') + self.assertTrue(fsa.ttot > 0.1) + + def test_module_stress(self): + self.assertEqual(yappi.is_running(), False) + + yappi.start() + yappi.clear_stats() + self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") + + yappi.stop() + yappi.clear_stats() + yappi.set_clock_type("cpu") + self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy") + self.assertEqual(yappi.is_running(), False) + yappi.clear_stats() + yappi.clear_stats() + + def test_stat_sorting(self): + _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} + _yappi._set_test_timings(_timings) + + self._ncall = 1 + + def a(): + b() + + def b(): + if self._ncall == 2: + return + self._ncall += 1 + a() + + stats = utils.run_and_get_func_stats(a) + stats = stats.sort("totaltime", "desc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.ttot >= stat.ttot) + prev_stat = stat + stats = stats.sort("totaltime", "asc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.ttot <= stat.ttot) + prev_stat = stat + stats = stats.sort("avgtime", "asc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.tavg <= stat.tavg) + prev_stat = stat + stats = stats.sort("name", "asc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.name <= stat.name) + prev_stat = stat + stats = stats.sort("subtime", "asc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.tsub <= stat.tsub) + prev_stat = stat + + self.assertRaises( + yappi.YappiError, stats.sort, "invalid_func_sorttype_arg" + ) + self.assertRaises( + yappi.YappiError, stats.sort, "totaltime", + "invalid_func_sortorder_arg" + ) + + def test_start_flags(self): + self.assertEqual(_yappi._get_start_flags(), None) + yappi.start() + + def a(): + pass + + a() + self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) + self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) + self.assertEqual(len(yappi.get_thread_stats()), 1) + + def test_builtin_profiling(self): + + def a(): + time.sleep(0.4) # is a builtin function + + yappi.set_clock_type('wall') + + yappi.start(builtins=True) + a() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'sleep') + self.assertTrue(fsa is not None) + self.assertTrue(fsa.ttot > 0.3) + yappi.stop() + yappi.clear_stats() + + def a(): + pass + + yappi.start() + t = threading.Thread(target=a) + t.start() + t.join() + stats = yappi.get_func_stats() + + def test_singlethread_profiling(self): + yappi.set_clock_type('wall') + + def a(): + time.sleep(0.2) + + class Worker1(threading.Thread): + + def a(self): + time.sleep(0.3) + + def run(self): + self.a() + + yappi.start(profile_threads=False) + + c = Worker1() + c.start() + c.join() + a() + stats = yappi.get_func_stats() + fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') + fsa2 = utils.find_stat_by_name(stats, 'a') + self.assertTrue(fsa1 is None) + self.assertTrue(fsa2 is not None) + self.assertTrue(fsa2.ttot > 0.1) + + def test_run(self): + + def profiled(): + pass + + yappi.clear_stats() + try: + with yappi.run(): + profiled() + stats = yappi.get_func_stats() + finally: + yappi.clear_stats() + + self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) + + def test_run_recursive(self): + + def profiled(): + pass + + def not_profiled(): + pass + + yappi.clear_stats() + try: + with yappi.run(): + with yappi.run(): + profiled() + # Profiling stopped here + not_profiled() + stats = yappi.get_func_stats() + finally: + yappi.clear_stats() + + self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) + self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled')) + + +class StatSaveScenarios(utils.YappiUnitTestCase): + + def test_pstats_conversion(self): + + def pstat_id(fs): + return (fs.module, fs.lineno, fs.name) + + def a(): + d() + + def b(): + d() + + def c(): + pass + + def d(): + pass + + _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2} + _yappi._set_test_timings(_timings) + stats = utils.run_and_get_func_stats(a, ) + stats.strip_dirs() + stats.save("tests/a1.pstats", type="pstat") + fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a")) + fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d")) + yappi.clear_stats() + _yappi._set_test_timings(_timings) + stats = utils.run_and_get_func_stats(a, ) + stats.strip_dirs() + stats.save("tests/a2.pstats", type="pstat") + yappi.clear_stats() + _yappi._set_test_timings(_timings) + stats = utils.run_and_get_func_stats(b, ) + stats.strip_dirs() + stats.save("tests/b1.pstats", type="pstat") + fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b")) + yappi.clear_stats() + _yappi._set_test_timings(_timings) + stats = utils.run_and_get_func_stats(c, ) + stats.strip_dirs() + stats.save("tests/c1.pstats", type="pstat") + fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c")) + + # merge saved stats and check pstats values are correct + import pstats + p = pstats.Stats( + 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats', + 'tests/c1.pstats' + ) + p.strip_dirs() + # ct = ttot, tt = tsub + (cc, nc, tt, ct, callers) = p.stats[fsa_pid] + self.assertEqual(cc, nc, 2) + self.assertEqual(tt, 20) + self.assertEqual(ct, 24) + (cc, nc, tt, ct, callers) = p.stats[fsd_pid] + self.assertEqual(cc, nc, 3) + self.assertEqual(tt, 6) + self.assertEqual(ct, 6) + self.assertEqual(len(callers), 2) + (cc, nc, tt, ct) = callers[fsa_pid] + self.assertEqual(cc, nc, 2) + self.assertEqual(tt, 4) + self.assertEqual(ct, 4) + (cc, nc, tt, ct) = callers[fsb_pid] + self.assertEqual(cc, nc, 1) + self.assertEqual(tt, 2) + self.assertEqual(ct, 2) + + def test_merge_stats(self): + _timings = { + "a_1": 15, + "b_1": 14, + "c_1": 12, + "d_1": 10, + "e_1": 9, + "f_1": 7, + "g_1": 6, + "h_1": 5, + "i_1": 1 + } + _yappi._set_test_timings(_timings) + + def a(): + b() + + def b(): + c() + + def c(): + d() + + def d(): + e() + + def e(): + f() + + def f(): + g() + + def g(): + h() + + def h(): + i() + + def i(): + pass + + yappi.start() + a() + a() + yappi.stop() + stats = yappi.get_func_stats() + self.assertRaises( + NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE" + ) + stats.save("tests/ystats2.ys") + yappi.clear_stats() + _yappi._set_test_timings(_timings) + yappi.start() + a() + stats = yappi.get_func_stats().add("tests/ystats2.ys") + fsa = utils.find_stat_by_name(stats, "a") + fsb = utils.find_stat_by_name(stats, "b") + fsc = utils.find_stat_by_name(stats, "c") + fsd = utils.find_stat_by_name(stats, "d") + fse = utils.find_stat_by_name(stats, "e") + fsf = utils.find_stat_by_name(stats, "f") + fsg = utils.find_stat_by_name(stats, "g") + fsh = utils.find_stat_by_name(stats, "h") + fsi = utils.find_stat_by_name(stats, "i") + self.assertEqual(fsa.ttot, 45) + self.assertEqual(fsa.ncall, 3) + self.assertEqual(fsa.nactualcall, 3) + self.assertEqual(fsa.tsub, 3) + self.assertEqual(fsa.children[fsb].ttot, fsb.ttot) + self.assertEqual(fsa.children[fsb].tsub, fsb.tsub) + self.assertEqual(fsb.children[fsc].ttot, fsc.ttot) + self.assertEqual(fsb.children[fsc].tsub, fsc.tsub) + self.assertEqual(fsc.tsub, 6) + self.assertEqual(fsc.children[fsd].ttot, fsd.ttot) + self.assertEqual(fsc.children[fsd].tsub, fsd.tsub) + self.assertEqual(fsd.children[fse].ttot, fse.ttot) + self.assertEqual(fsd.children[fse].tsub, fse.tsub) + self.assertEqual(fse.children[fsf].ttot, fsf.ttot) + self.assertEqual(fse.children[fsf].tsub, fsf.tsub) + self.assertEqual(fsf.children[fsg].ttot, fsg.ttot) + self.assertEqual(fsf.children[fsg].tsub, fsg.tsub) + self.assertEqual(fsg.ttot, 18) + self.assertEqual(fsg.tsub, 3) + self.assertEqual(fsg.children[fsh].ttot, fsh.ttot) + self.assertEqual(fsg.children[fsh].tsub, fsh.tsub) + self.assertEqual(fsh.ttot, 15) + self.assertEqual(fsh.tsub, 12) + self.assertEqual(fsh.tavg, 5) + self.assertEqual(fsh.children[fsi].ttot, fsi.ttot) + self.assertEqual(fsh.children[fsi].tsub, fsi.tsub) + #stats.debug_print() + + def test_merge_multithreaded_stats(self): + import _yappi + timings = {"a_1": 2, "b_1": 1} + _yappi._set_test_timings(timings) + + def a(): + pass + + def b(): + pass + + yappi.start() + t = threading.Thread(target=a) + t.start() + t.join() + t = threading.Thread(target=b) + t.start() + t.join() + yappi.get_func_stats().save("tests/ystats1.ys") + yappi.clear_stats() + _yappi._set_test_timings(timings) + self.assertEqual(len(yappi.get_func_stats()), 0) + self.assertEqual(len(yappi.get_thread_stats()), 1) + t = threading.Thread(target=a) + t.start() + t.join() + + self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) + self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) + yappi.get_func_stats().save("tests/ystats2.ys") + + stats = yappi.YFuncStats([ + "tests/ystats1.ys", + "tests/ystats2.ys", + ]) + fsa = utils.find_stat_by_name(stats, "a") + fsb = utils.find_stat_by_name(stats, "b") + self.assertEqual(fsa.ncall, 2) + self.assertEqual(fsb.ncall, 1) + self.assertEqual(fsa.tsub, fsa.ttot, 4) + self.assertEqual(fsb.tsub, fsb.ttot, 1) + + def test_merge_load_different_clock_types(self): + yappi.start(builtins=True) + + def a(): + b() + + def b(): + c() + + def c(): + pass + + t = threading.Thread(target=a) + t.start() + t.join() + yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys") + yappi.stop() + yappi.clear_stats() + yappi.start(builtins=False) + t = threading.Thread(target=a) + t.start() + t.join() + yappi.get_func_stats().save("tests/ystats2.ys") + yappi.stop() + self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") + yappi.clear_stats() + yappi.set_clock_type("wall") + yappi.start() + t = threading.Thread(target=a) + t.start() + t.join() + yappi.get_func_stats().save("tests/ystats3.ys") + self.assertRaises( + yappi.YappiError, + yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys" + ) + stats = yappi.YFuncStats(["tests/ystats1.ys", + "tests/ystats2.ys"]).sort("name") + fsa = utils.find_stat_by_name(stats, "a") + fsb = utils.find_stat_by_name(stats, "b") + fsc = utils.find_stat_by_name(stats, "c") + self.assertEqual(fsa.ncall, 2) + self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall) + + def test_merge_aabab_aabbc(self): + _timings = { + "a_1": 15, + "a_2": 14, + "b_1": 12, + "a_3": 10, + "b_2": 9, + "c_1": 4 + } + _yappi._set_test_timings(_timings) + + def a(): + if self._ncall == 1: + self._ncall += 1 + a() + elif self._ncall == 5: + self._ncall += 1 + a() + else: + b() + + def b(): + if self._ncall == 2: + self._ncall += 1 + a() + elif self._ncall == 6: + self._ncall += 1 + b() + elif self._ncall == 7: + c() + else: + return + + def c(): + pass + + self._ncall = 1 + stats = utils.run_and_get_func_stats(a, ) + stats.save("tests/ystats1.ys") + yappi.clear_stats() + _yappi._set_test_timings(_timings) + #stats.print_all() + + self._ncall = 5 + stats = utils.run_and_get_func_stats(a, ) + stats.save("tests/ystats2.ys") + + #stats.print_all() + + def a(): # same name but another function(code object) + pass + + yappi.start() + a() + stats = yappi.get_func_stats().add( + ["tests/ystats1.ys", "tests/ystats2.ys"] + ) + #stats.print_all() + self.assertEqual(len(stats), 4) + + fsa = None + for stat in stats: + if stat.name == "a" and stat.ttot == 45: + fsa = stat + break + self.assertTrue(fsa is not None) + + self.assertEqual(fsa.ncall, 7) + self.assertEqual(fsa.nactualcall, 3) + self.assertEqual(fsa.ttot, 45) + self.assertEqual(fsa.tsub, 10) + fsb = utils.find_stat_by_name(stats, "b") + fsc = utils.find_stat_by_name(stats, "c") + self.assertEqual(fsb.ncall, 6) + self.assertEqual(fsb.nactualcall, 3) + self.assertEqual(fsb.ttot, 36) + self.assertEqual(fsb.tsub, 27) + self.assertEqual(fsb.tavg, 6) + self.assertEqual(fsc.ttot, 8) + self.assertEqual(fsc.tsub, 8) + self.assertEqual(fsc.tavg, 4) + self.assertEqual(fsc.nactualcall, fsc.ncall, 2) + + +class MultithreadedScenarios(utils.YappiUnitTestCase): + + def test_issue_32(self): + ''' + Start yappi from different thread and we get Internal Error(15) as + the current_ctx_id() called while enumerating the threads in start() + and as it does not swap to the enumerated ThreadState* the THreadState_GetDict() + returns wrong object and thus sets an invalid id for the _ctx structure. + + When this issue happens multiple Threads have same tid as the internal ts_ptr + will be same for different contexts. So, let's see if that happens + ''' + + def foo(): + time.sleep(0.2) + + def bar(): + time.sleep(0.1) + + def thread_func(): + yappi.set_clock_type("wall") + yappi.start() + + bar() + + t = threading.Thread(target=thread_func) + t.start() + t.join() + + foo() + + yappi.stop() + + thread_ids = set() + for tstat in yappi.get_thread_stats(): + self.assertTrue(tstat.tid not in thread_ids) + thread_ids.add(tstat.tid) + + def test_subsequent_profile(self): + WORKER_COUNT = 5 + + def a(): + pass + + def b(): + pass + + def c(): + pass + + _timings = { + "a_1": 3, + "b_1": 2, + "c_1": 1, + } + + yappi.start() + + def g(): + pass + + g() + yappi.stop() + yappi.clear_stats() + _yappi._set_test_timings(_timings) + yappi.start() + + _dummy = [] + for i in range(WORKER_COUNT): + t = threading.Thread(target=a) + t.start() + t.join() + for i in range(WORKER_COUNT): + t = threading.Thread(target=b) + t.start() + _dummy.append(t) + t.join() + for i in range(WORKER_COUNT): + t = threading.Thread(target=a) + t.start() + t.join() + for i in range(WORKER_COUNT): + t = threading.Thread(target=c) + t.start() + t.join() + yappi.stop() + yappi.start() + + def f(): + pass + + f() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + fsc = utils.find_stat_by_name(stats, 'c') + self.assertEqual(fsa.ncall, 10) + self.assertEqual(fsb.ncall, 5) + self.assertEqual(fsc.ncall, 5) + self.assertEqual(fsa.ttot, fsa.tsub, 30) + self.assertEqual(fsb.ttot, fsb.tsub, 10) + self.assertEqual(fsc.ttot, fsc.tsub, 5) + + # MACOSx optimizes by only creating one worker thread + self.assertTrue(len(yappi.get_thread_stats()) >= 2) + + def test_basic(self): + yappi.set_clock_type('wall') + + def dummy(): + pass + + def a(): + time.sleep(0.2) + + class Worker1(threading.Thread): + + def a(self): + time.sleep(0.3) + + def run(self): + self.a() + + yappi.start(builtins=False, profile_threads=True) + + c = Worker1() + c.start() + c.join() + a() + stats = yappi.get_func_stats() + fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') + fsa2 = utils.find_stat_by_name(stats, 'a') + self.assertTrue(fsa1 is not None) + self.assertTrue(fsa2 is not None) + self.assertTrue(fsa1.ttot > 0.2) + self.assertTrue(fsa2.ttot > 0.1) + tstats = yappi.get_thread_stats() + self.assertEqual(len(tstats), 2) + tsa = utils.find_stat_by_name(tstats, 'Worker1') + tsm = utils.find_stat_by_name(tstats, '_MainThread') + dummy() # call dummy to force ctx name to be retrieved again. + self.assertTrue(tsa is not None) + # TODO: I put dummy() to fix below, remove the comments after a while. + self.assertTrue( # FIX: I see this fails sometimes? + tsm is not None, + 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats)))) + + def test_ctx_stats(self): + from threading import Thread + DUMMY_WORKER_COUNT = 5 + yappi.start() + + class DummyThread(Thread): + pass + + def dummy(): + pass + + def dummy_worker(): + pass + + for i in range(DUMMY_WORKER_COUNT): + t = DummyThread(target=dummy_worker) + t.start() + t.join() + yappi.stop() + stats = yappi.get_thread_stats() + tsa = utils.find_stat_by_name(stats, "DummyThread") + self.assertTrue(tsa is not None) + yappi.clear_stats() + time.sleep(1.0) + _timings = { + "a_1": 6, + "b_1": 5, + "c_1": 3, + "d_1": 1, + "a_2": 4, + "b_2": 3, + "c_2": 2, + "d_2": 1 + } + _yappi._set_test_timings(_timings) + + class Thread1(Thread): + pass + + class Thread2(Thread): + pass + + def a(): + b() + + def b(): + c() + + def c(): + d() + + def d(): + time.sleep(0.6) + + yappi.set_clock_type("wall") + yappi.start() + t1 = Thread1(target=a) + t1.start() + t2 = Thread2(target=a) + t2.start() + t1.join() + t2.join() + stats = yappi.get_thread_stats() + + # the fist clear_stats clears the context table? + tsa = utils.find_stat_by_name(stats, "DummyThread") + self.assertTrue(tsa is None) + + tst1 = utils.find_stat_by_name(stats, "Thread1") + tst2 = utils.find_stat_by_name(stats, "Thread2") + tsmain = utils.find_stat_by_name(stats, "_MainThread") + dummy() # call dummy to force ctx name to be retrieved again. + self.assertTrue(len(stats) == 3) + self.assertTrue(tst1 is not None) + self.assertTrue(tst2 is not None) + # TODO: I put dummy() to fix below, remove the comments after a while. + self.assertTrue( # FIX: I see this fails sometimes + tsmain is not None, + 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(stats)))) + self.assertTrue(1.0 > tst2.ttot >= 0.5) + self.assertTrue(1.0 > tst1.ttot >= 0.5) + + # test sorting of the ctx stats + stats = stats.sort("totaltime", "desc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.ttot >= stat.ttot) + prev_stat = stat + stats = stats.sort("totaltime", "asc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.ttot <= stat.ttot) + prev_stat = stat + stats = stats.sort("schedcount", "desc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.sched_count >= stat.sched_count) + prev_stat = stat + stats = stats.sort("name", "desc") + prev_stat = None + for stat in stats: + if prev_stat: + self.assertTrue(prev_stat.name.lower() >= stat.name.lower()) + prev_stat = stat + self.assertRaises( + yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg" + ) + self.assertRaises( + yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg" + ) + + def test_ctx_stats_cpu(self): + + def get_thread_name(): + try: + return threading.current_thread().name + except AttributeError: + return "Anonymous" + + def burn_cpu(sec): + t0 = yappi.get_clock_time() + elapsed = 0 + while (elapsed < sec): + for _ in range(1000): + pass + elapsed = yappi.get_clock_time() - t0 + + def test(): + + ts = [] + for i in (0.01, 0.05, 0.1): + t = threading.Thread(target=burn_cpu, args=(i, )) + t.name = "burn_cpu-%s" % str(i) + t.start() + ts.append(t) + for t in ts: + t.join() + + yappi.set_clock_type("cpu") + yappi.set_context_name_callback(get_thread_name) + + yappi.start() + + test() + + yappi.stop() + + tstats = yappi.get_thread_stats() + r1 = ''' + burn_cpu-0.1 3 123145356058624 0.100105 8 + burn_cpu-0.05 2 123145361313792 0.050149 8 + burn_cpu-0.01 1 123145356058624 0.010127 2 + MainThread 0 4321620864 0.001632 6 + ''' + self.assert_ctx_stats_almost_equal(r1, tstats) + + def test_producer_consumer_with_queues(self): + # we currently just stress yappi, no functionality test is done here. + yappi.start() + if utils.is_py3x(): + from queue import Queue + else: + from Queue import Queue + from threading import Thread + WORKER_THREAD_COUNT = 50 + WORK_ITEM_COUNT = 2000 + + def worker(): + while True: + item = q.get() + # do the work with item + q.task_done() + + q = Queue() + for i in range(WORKER_THREAD_COUNT): + t = Thread(target=worker) + t.daemon = True + t.start() + + for item in range(WORK_ITEM_COUNT): + q.put(item) + q.join() # block until all tasks are done + #yappi.get_func_stats().sort("callcount").print_all() + yappi.stop() + + def test_temporary_lock_waiting(self): + yappi.start() + _lock = threading.Lock() + + def worker(): + _lock.acquire() + try: + time.sleep(1.0) + finally: + _lock.release() + + t1 = threading.Thread(target=worker) + t2 = threading.Thread(target=worker) + t1.start() + t2.start() + t1.join() + t2.join() + #yappi.get_func_stats().sort("callcount").print_all() + yappi.stop() + + @unittest.skipIf(os.name != "posix", "requires Posix compliant OS") + def test_signals_with_blocking_calls(self): + import signal, os, time + + # just to verify if signal is handled correctly and stats/yappi are not corrupted. + def handler(signum, frame): + raise Exception("Signal handler executed!") + + yappi.start() + signal.signal(signal.SIGALRM, handler) + signal.alarm(1) + self.assertRaises(Exception, time.sleep, 2) + stats = yappi.get_func_stats() + fsh = utils.find_stat_by_name(stats, "handler") + self.assertTrue(fsh is not None) + + @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2") + def test_concurrent_futures(self): + yappi.start() + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=5) as executor: + f = executor.submit(pow, 5, 2) + self.assertEqual(f.result(), 25) + time.sleep(1.0) + yappi.stop() + + @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2") + def test_barrier(self): + yappi.start() + b = threading.Barrier(2, timeout=1) + + def worker(): + try: + b.wait() + except threading.BrokenBarrierError: + pass + except Exception: + raise Exception("BrokenBarrierError not raised") + + t1 = threading.Thread(target=worker) + t1.start() + #b.wait() + t1.join() + yappi.stop() + + +class NonRecursiveFunctions(utils.YappiUnitTestCase): + + def test_abcd(self): + _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1} + _yappi._set_test_timings(_timings) + + def a(): + b() + + def b(): + c() + + def c(): + d() + + def d(): + pass + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + fsc = utils.find_stat_by_name(stats, 'c') + fsd = utils.find_stat_by_name(stats, 'd') + cfsab = fsa.children[fsb] + cfsbc = fsb.children[fsc] + cfscd = fsc.children[fsd] + + self.assertEqual(fsa.ttot, 6) + self.assertEqual(fsa.tsub, 1) + self.assertEqual(fsb.ttot, 5) + self.assertEqual(fsb.tsub, 2) + self.assertEqual(fsc.ttot, 3) + self.assertEqual(fsc.tsub, 2) + self.assertEqual(fsd.ttot, 1) + self.assertEqual(fsd.tsub, 1) + self.assertEqual(cfsab.ttot, 5) + self.assertEqual(cfsab.tsub, 2) + self.assertEqual(cfsbc.ttot, 3) + self.assertEqual(cfsbc.tsub, 2) + self.assertEqual(cfscd.ttot, 1) + self.assertEqual(cfscd.tsub, 1) + + def test_stop_in_middle(self): + _timings = {"a_1": 6, "b_1": 4} + _yappi._set_test_timings(_timings) + + def a(): + b() + yappi.stop() + + def b(): + time.sleep(0.2) + + yappi.start() + a() + stats = yappi.get_func_stats() + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + + self.assertEqual(fsa.ncall, 1) + self.assertEqual(fsa.nactualcall, 0) + self.assertEqual(fsa.ttot, 0) # no call_leave called + self.assertEqual(fsa.tsub, 0) # no call_leave called + self.assertEqual(fsb.ttot, 4) + + +class RecursiveFunctions(utils.YappiUnitTestCase): + + def test_fibonacci(self): + + def fib(n): + if n > 1: + return fib(n - 1) + fib(n - 2) + else: + return n + + stats = utils.run_and_get_func_stats(fib, 22) + fs = utils.find_stat_by_name(stats, 'fib') + self.assertEqual(fs.ncall, 57313) + self.assertEqual(fs.ttot, fs.tsub) + + def test_abcadc(self): + _timings = { + "a_1": 20, + "b_1": 19, + "c_1": 17, + "a_2": 13, + "d_1": 12, + "c_2": 10, + "a_3": 5 + } + _yappi._set_test_timings(_timings) + + def a(n): + if n == 3: + return + if n == 1 + 1: + d(n) + else: + b(n) + + def b(n): + c(n) + + def c(n): + a(n + 1) + + def d(n): + c(n) + + stats = utils.run_and_get_func_stats(a, 1) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + fsc = utils.find_stat_by_name(stats, 'c') + fsd = utils.find_stat_by_name(stats, 'd') + self.assertEqual(fsa.ncall, 3) + self.assertEqual(fsa.nactualcall, 1) + self.assertEqual(fsa.ttot, 20) + self.assertEqual(fsa.tsub, 7) + self.assertEqual(fsb.ttot, 19) + self.assertEqual(fsb.tsub, 2) + self.assertEqual(fsc.ttot, 17) + self.assertEqual(fsc.tsub, 9) + self.assertEqual(fsd.ttot, 12) + self.assertEqual(fsd.tsub, 2) + cfsca = fsc.children[fsa] + self.assertEqual(cfsca.nactualcall, 0) + self.assertEqual(cfsca.ncall, 2) + self.assertEqual(cfsca.ttot, 13) + self.assertEqual(cfsca.tsub, 6) + + def test_aaaa(self): + _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2} + _yappi._set_test_timings(_timings) + + def d(n): + if n == 3: + return + d(n + 1) + + stats = utils.run_and_get_func_stats(d, 0) + fsd = utils.find_stat_by_name(stats, 'd') + self.assertEqual(fsd.ncall, 4) + self.assertEqual(fsd.nactualcall, 1) + self.assertEqual(fsd.ttot, 9) + self.assertEqual(fsd.tsub, 9) + cfsdd = fsd.children[fsd] + self.assertEqual(cfsdd.ttot, 7) + self.assertEqual(cfsdd.tsub, 7) + self.assertEqual(cfsdd.ncall, 3) + self.assertEqual(cfsdd.nactualcall, 0) + + def test_abcabc(self): + _timings = { + "a_1": 20, + "b_1": 19, + "c_1": 17, + "a_2": 13, + "b_2": 11, + "c_2": 9, + "a_3": 6 + } + _yappi._set_test_timings(_timings) + + def a(n): + if n == 3: + return + else: + b(n) + + def b(n): + c(n) + + def c(n): + a(n + 1) + + stats = utils.run_and_get_func_stats(a, 1) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + fsc = utils.find_stat_by_name(stats, 'c') + self.assertEqual(fsa.ncall, 3) + self.assertEqual(fsa.nactualcall, 1) + self.assertEqual(fsa.ttot, 20) + self.assertEqual(fsa.tsub, 9) + self.assertEqual(fsb.ttot, 19) + self.assertEqual(fsb.tsub, 4) + self.assertEqual(fsc.ttot, 17) + self.assertEqual(fsc.tsub, 7) + cfsab = fsa.children[fsb] + cfsbc = fsb.children[fsc] + cfsca = fsc.children[fsa] + self.assertEqual(cfsab.ttot, 19) + self.assertEqual(cfsab.tsub, 4) + self.assertEqual(cfsbc.ttot, 17) + self.assertEqual(cfsbc.tsub, 7) + self.assertEqual(cfsca.ttot, 13) + self.assertEqual(cfsca.tsub, 8) + + def test_abcbca(self): + _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1} + _yappi._set_test_timings(_timings) + self._ncall = 1 + + def a(): + if self._ncall == 1: + b() + else: + return + + def b(): + c() + + def c(): + if self._ncall == 1: + self._ncall += 1 + b() + else: + a() + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + fsc = utils.find_stat_by_name(stats, 'c') + cfsab = fsa.children[fsb] + cfsbc = fsb.children[fsc] + cfsca = fsc.children[fsa] + self.assertEqual(fsa.ttot, 10) + self.assertEqual(fsa.tsub, 2) + self.assertEqual(fsb.ttot, 9) + self.assertEqual(fsb.tsub, 4) + self.assertEqual(fsc.ttot, 7) + self.assertEqual(fsc.tsub, 4) + self.assertEqual(cfsab.ttot, 9) + self.assertEqual(cfsab.tsub, 2) + self.assertEqual(cfsbc.ttot, 7) + self.assertEqual(cfsbc.tsub, 4) + self.assertEqual(cfsca.ttot, 1) + self.assertEqual(cfsca.tsub, 1) + self.assertEqual(cfsca.ncall, 1) + self.assertEqual(cfsca.nactualcall, 0) + + def test_aabccb(self): + _timings = { + "a_1": 13, + "a_2": 11, + "b_1": 9, + "c_1": 5, + "c_2": 3, + "b_2": 1 + } + _yappi._set_test_timings(_timings) + self._ncall = 1 + + def a(): + if self._ncall == 1: + self._ncall += 1 + a() + else: + b() + + def b(): + if self._ncall == 3: + return + else: + c() + + def c(): + if self._ncall == 2: + self._ncall += 1 + c() + else: + b() + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + fsc = utils.find_stat_by_name(stats, 'c') + cfsaa = fsa.children[fsa.index] + cfsab = fsa.children[fsb] + cfsbc = fsb.children[fsc.full_name] + cfscc = fsc.children[fsc] + cfscb = fsc.children[fsb] + self.assertEqual(fsb.ttot, 9) + self.assertEqual(fsb.tsub, 5) + self.assertEqual(cfsbc.ttot, 5) + self.assertEqual(cfsbc.tsub, 2) + self.assertEqual(fsa.ttot, 13) + self.assertEqual(fsa.tsub, 4) + self.assertEqual(cfsab.ttot, 9) + self.assertEqual(cfsab.tsub, 4) + self.assertEqual(cfsaa.ttot, 11) + self.assertEqual(cfsaa.tsub, 2) + self.assertEqual(fsc.ttot, 5) + self.assertEqual(fsc.tsub, 4) + + def test_abaa(self): + _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5} + _yappi._set_test_timings(_timings) + + self._ncall = 1 + + def a(): + if self._ncall == 1: + b() + elif self._ncall == 2: + self._ncall += 1 + a() + else: + return + + def b(): + self._ncall += 1 + a() + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + cfsaa = fsa.children[fsa] + cfsba = fsb.children[fsa] + self.assertEqual(fsb.ttot, 10) + self.assertEqual(fsb.tsub, 1) + self.assertEqual(fsa.ttot, 13) + self.assertEqual(fsa.tsub, 12) + self.assertEqual(cfsaa.ttot, 5) + self.assertEqual(cfsaa.tsub, 5) + self.assertEqual(cfsba.ttot, 9) + self.assertEqual(cfsba.tsub, 4) + + def test_aabb(self): + _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5} + _yappi._set_test_timings(_timings) + + self._ncall = 1 + + def a(): + if self._ncall == 1: + self._ncall += 1 + a() + elif self._ncall == 2: + b() + else: + return + + def b(): + if self._ncall == 2: + self._ncall += 1 + b() + else: + return + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + cfsaa = fsa.children[fsa] + cfsab = fsa.children[fsb] + cfsbb = fsb.children[fsb] + self.assertEqual(fsa.ttot, 13) + self.assertEqual(fsa.tsub, 4) + self.assertEqual(fsb.ttot, 9) + self.assertEqual(fsb.tsub, 9) + self.assertEqual(cfsaa.ttot, 10) + self.assertEqual(cfsaa.tsub, 1) + self.assertEqual(cfsab.ttot, 9) + self.assertEqual(cfsab.tsub, 4) + self.assertEqual(cfsbb.ttot, 5) + self.assertEqual(cfsbb.tsub, 5) + + def test_abbb(self): + _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1} + _yappi._set_test_timings(_timings) + + self._ncall = 1 + + def a(): + if self._ncall == 1: + b() + + def b(): + if self._ncall == 3: + return + self._ncall += 1 + b() + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + cfsab = fsa.children[fsb] + cfsbb = fsb.children[fsb] + self.assertEqual(fsa.ttot, 13) + self.assertEqual(fsa.tsub, 3) + self.assertEqual(fsb.ttot, 10) + self.assertEqual(fsb.tsub, 10) + self.assertEqual(fsb.ncall, 3) + self.assertEqual(fsb.nactualcall, 1) + self.assertEqual(cfsab.ttot, 10) + self.assertEqual(cfsab.tsub, 4) + self.assertEqual(cfsbb.ttot, 6) + self.assertEqual(cfsbb.tsub, 6) + self.assertEqual(cfsbb.nactualcall, 0) + self.assertEqual(cfsbb.ncall, 2) + + def test_aaab(self): + _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1} + _yappi._set_test_timings(_timings) + + self._ncall = 1 + + def a(): + if self._ncall == 3: + b() + return + self._ncall += 1 + a() + + def b(): + return + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + cfsaa = fsa.children[fsa] + cfsab = fsa.children[fsb] + self.assertEqual(fsa.ttot, 13) + self.assertEqual(fsa.tsub, 12) + self.assertEqual(fsb.ttot, 1) + self.assertEqual(fsb.tsub, 1) + self.assertEqual(cfsaa.ttot, 10) + self.assertEqual(cfsaa.tsub, 9) + self.assertEqual(cfsab.ttot, 1) + self.assertEqual(cfsab.tsub, 1) + + def test_abab(self): + _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} + _yappi._set_test_timings(_timings) + + self._ncall = 1 + + def a(): + b() + + def b(): + if self._ncall == 2: + return + self._ncall += 1 + a() + + stats = utils.run_and_get_func_stats(a) + fsa = utils.find_stat_by_name(stats, 'a') + fsb = utils.find_stat_by_name(stats, 'b') + cfsab = fsa.children[fsb] + cfsba = fsb.children[fsa] + self.assertEqual(fsa.ttot, 13) + self.assertEqual(fsa.tsub, 8) + self.assertEqual(fsb.ttot, 10) + self.assertEqual(fsb.tsub, 5) + self.assertEqual(cfsab.ttot, 10) + self.assertEqual(cfsab.tsub, 5) + self.assertEqual(cfsab.ncall, 2) + self.assertEqual(cfsab.nactualcall, 1) + self.assertEqual(cfsba.ttot, 6) + self.assertEqual(cfsba.tsub, 5) + + +if __name__ == '__main__': + # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script'] + # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile'] + unittest.main() --- a/tests/test_gevent.py +++ b/tests/test_gevent.py @@ -4,7 +4,7 @@ import yappi import gevent from gevent.event import Event import threading -from utils import ( +from .utils import ( YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io, burn_io_gevent ) --- a/tests/test_hooks.py +++ b/tests/test_hooks.py @@ -5,7 +5,7 @@ import unittest import time import yappi -import utils +import tests.utils as utils def a(): --- a/tests/test_tags.py +++ b/tests/test_tags.py @@ -2,7 +2,7 @@ import unittest import yappi import threading import time -from utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io +from .utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io class MultiThreadTests(YappiUnitTestCase):