diff options
Diffstat (limited to 'meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch')
-rw-r--r-- | meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch | 3905 |
1 files changed, 0 insertions, 3905 deletions
diff --git a/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch b/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch deleted file mode 100644 index fa58897e68..0000000000 --- a/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch +++ /dev/null @@ -1,3905 +0,0 @@ -From 0dedc1c573ddc4e87475eb03c64555cd54a72e92 Mon Sep 17 00:00:00 2001 -From: Trevor Gamblin <trevor.gamblin@windriver.com> -Date: Mon, 7 Jun 2021 09:40:20 -0400 -Subject: [PATCH] Fix imports for tests - -Signed-off-by: Trevor Gamblin <trevor.gamblin@windriver.com> ---- - tests/test_asyncio.py | 2 +- - tests/test_asyncio_context_vars.py | 2 +- - tests/test_functionality.py | 2 +- - tests/test_gevent.py | 2 +- - tests/test_hooks.py | 2 +- - tests/test_tags.py | 2 +- - 6 files changed, 6 insertions(+), 6 deletions(-) - ---- a/tests/test_asyncio.py -+++ b/tests/test_asyncio.py -@@ -2,7 +2,7 @@ import unittest - import yappi - import asyncio - import threading --from utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io -+from .utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io - - - @asyncio.coroutine ---- a/tests/test_asyncio_context_vars.py -+++ b/tests/test_asyncio_context_vars.py -@@ -5,7 +5,7 @@ import contextvars - import functools - import time - import os --import utils -+import tests.utils as utils - import yappi - - async_context_id = contextvars.ContextVar('async_context_id') ---- a/tests/test_functionality.py -+++ b/tests/test_functionality.py -@@ -1,1916 +1,1916 @@ --import os
--import sys
--import time
--import threading
--import unittest
--import yappi
--import _yappi
--import utils
--import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6
--import subprocess
--
--_counter = 0
--
--
--class BasicUsage(utils.YappiUnitTestCase):
--
-- def test_callback_function_int_return_overflow(self):
-- # this test is just here to check if any errors are generated, as the err
-- # is printed in C side, I did not include it here. THere are ways to test
-- # this deterministically, I did not bother
-- import ctypes
--
-- def _unsigned_overflow_margin():
-- return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1
--
-- def foo():
-- pass
--
-- #with utils.captured_output() as (out, err):
-- yappi.set_context_id_callback(_unsigned_overflow_margin)
-- yappi.set_tag_callback(_unsigned_overflow_margin)
-- yappi.start()
-- foo()
--
-- def test_issue60(self):
--
-- def foo():
-- buf = bytearray()
-- buf += b't' * 200
-- view = memoryview(buf)[10:]
-- view = view.tobytes()
-- del buf[:10] # this throws exception
-- return view
--
-- yappi.start(builtins=True)
-- foo()
-- self.assertTrue(
-- len(
-- yappi.get_func_stats(
-- filter_callback=lambda x: yappi.
-- func_matches(x, [memoryview.tobytes])
-- )
-- ) > 0
-- )
-- yappi.stop()
--
-- def test_issue54(self):
--
-- def _tag_cbk():
-- global _counter
-- _counter += 1
-- return _counter
--
-- def a():
-- pass
--
-- def b():
-- pass
--
-- yappi.set_tag_callback(_tag_cbk)
-- yappi.start()
-- a()
-- a()
-- a()
-- yappi.stop()
-- stats = yappi.get_func_stats()
-- self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given
-- stats = yappi.get_func_stats(tag=1)
--
-- for i in range(1, 3):
-- stats = yappi.get_func_stats(tag=i)
-- stats = yappi.get_func_stats(
-- tag=i, filter_callback=lambda x: yappi.func_matches(x, [a])
-- )
--
-- stat = stats.pop()
-- self.assertEqual(stat.ncall, 1)
--
-- yappi.set_tag_callback(None)
-- yappi.clear_stats()
-- yappi.start()
-- b()
-- b()
-- stats = yappi.get_func_stats()
-- self.assertEqual(len(stats), 1)
-- stat = stats.pop()
-- self.assertEqual(stat.ncall, 2)
--
-- def test_filter(self):
--
-- def a():
-- pass
--
-- def b():
-- a()
--
-- def c():
-- b()
--
-- _TCOUNT = 5
--
-- ts = []
-- yappi.start()
-- for i in range(_TCOUNT):
-- t = threading.Thread(target=c)
-- t.start()
-- ts.append(t)
--
-- for t in ts:
-- t.join()
--
-- yappi.stop()
--
-- ctx_ids = []
-- for tstat in yappi.get_thread_stats():
-- if tstat.name == '_MainThread':
-- main_ctx_id = tstat.id
-- else:
-- ctx_ids.append(tstat.id)
--
-- fstats = yappi.get_func_stats(filter={"ctx_id": 9})
-- self.assertTrue(fstats.empty())
-- fstats = yappi.get_func_stats(
-- filter={
-- "ctx_id": main_ctx_id,
-- "name": "c"
-- }
-- ) # main thread
-- self.assertTrue(fstats.empty())
--
-- for i in ctx_ids:
-- fstats = yappi.get_func_stats(
-- filter={
-- "ctx_id": i,
-- "name": "a",
-- "ncall": 1
-- }
-- )
-- self.assertEqual(fstats.pop().ncall, 1)
-- fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"})
-- self.assertEqual(fstats.pop().ncall, 1)
-- fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"})
-- self.assertEqual(fstats.pop().ncall, 1)
--
-- yappi.clear_stats()
-- yappi.start(builtins=True)
-- time.sleep(0.1)
-- yappi.stop()
-- fstats = yappi.get_func_stats(filter={"module": "time"})
-- self.assertEqual(len(fstats), 1)
--
-- # invalid filters`
-- self.assertRaises(
-- Exception, yappi.get_func_stats, filter={'tag': "sss"}
-- )
-- self.assertRaises(
-- Exception, yappi.get_func_stats, filter={'ctx_id': "None"}
-- )
--
-- def test_filter_callback(self):
--
-- def a():
-- time.sleep(0.1)
--
-- def b():
-- a()
--
-- def c():
-- pass
--
-- def d():
-- pass
--
-- yappi.set_clock_type("wall")
-- yappi.start(builtins=True)
-- a()
-- b()
-- c()
-- d()
-- stats = yappi.get_func_stats(
-- filter_callback=lambda x: yappi.func_matches(x, [a, b])
-- )
-- #stats.print_all()
-- r1 = '''
-- tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175
-- tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197
-- '''
-- self.assert_traces_almost_equal(r1, stats)
-- self.assertEqual(len(stats), 2)
-- stats = yappi.get_func_stats(
-- filter_callback=lambda x: yappi.
-- module_matches(x, [sys.modules[__name__]])
-- )
-- r1 = '''
-- tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065
-- tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011
-- tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002
-- tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001
-- '''
-- self.assert_traces_almost_equal(r1, stats)
-- self.assertEqual(len(stats), 4)
--
-- stats = yappi.get_func_stats(
-- filter_callback=lambda x: yappi.func_matches(x, [time.sleep])
-- )
-- self.assertEqual(len(stats), 1)
-- r1 = '''
-- time.sleep 2 0.206804 0.220000 0.103402
-- '''
-- self.assert_traces_almost_equal(r1, stats)
--
-- def test_print_formatting(self):
--
-- def a():
-- pass
--
-- def b():
-- a()
--
-- func_cols = {
-- 1: ("name", 48),
-- 0: ("ncall", 5),
-- 2: ("tsub", 8),
-- }
-- thread_cols = {
-- 1: ("name", 48),
-- 0: ("ttot", 8),
-- }
--
-- yappi.start()
-- a()
-- b()
-- yappi.stop()
-- fs = yappi.get_func_stats()
-- cs = fs[1].children
-- ts = yappi.get_thread_stats()
-- #fs.print_all(out=sys.stderr, columns={1:("name", 70), })
-- #cs.print_all(out=sys.stderr, columns=func_cols)
-- #ts.print_all(out=sys.stderr, columns=thread_cols)
-- #cs.print_all(out=sys.stderr, columns={})
--
-- self.assertRaises(
-- yappi.YappiError, fs.print_all, columns={1: ("namee", 9)}
-- )
-- self.assertRaises(
-- yappi.YappiError, cs.print_all, columns={1: ("dd", 0)}
-- )
-- self.assertRaises(
-- yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)}
-- )
--
-- def test_get_clock(self):
-- yappi.set_clock_type('cpu')
-- self.assertEqual('cpu', yappi.get_clock_type())
-- clock_info = yappi.get_clock_info()
-- self.assertTrue('api' in clock_info)
-- self.assertTrue('resolution' in clock_info)
--
-- yappi.set_clock_type('wall')
-- self.assertEqual('wall', yappi.get_clock_type())
--
-- t0 = yappi.get_clock_time()
-- time.sleep(0.1)
-- duration = yappi.get_clock_time() - t0
-- self.assertTrue(0.05 < duration < 0.3)
--
-- def test_profile_decorator(self):
--
-- def aggregate(func, stats):
-- fname = "tests/%s.profile" % (func.__name__)
-- try:
-- stats.add(fname)
-- except IOError:
-- pass
-- stats.save(fname)
-- raise Exception("messing around")
--
-- @yappi.profile(return_callback=aggregate)
-- def a(x, y):
-- if x + y == 25:
-- raise Exception("")
-- return x + y
--
-- def b():
-- pass
--
-- try:
-- os.remove(
-- "tests/a.profile"
-- ) # remove the one from prev test, if available
-- except:
-- pass
--
-- # global profile is on to mess things up
-- yappi.start()
-- b()
--
-- # assert functionality and call function at same time
-- try:
-- self.assertEqual(a(1, 2), 3)
-- except:
-- pass
-- try:
-- self.assertEqual(a(2, 5), 7)
-- except:
-- pass
-- try:
-- a(4, 21)
-- except:
-- pass
-- stats = yappi.get_func_stats().add("tests/a.profile")
-- fsa = utils.find_stat_by_name(stats, 'a')
-- self.assertEqual(fsa.ncall, 3)
-- self.assertEqual(len(stats), 1) # b() should be cleared out.
--
-- @yappi.profile(return_callback=aggregate)
-- def count_down_rec(n):
-- if n == 0:
-- return
-- count_down_rec(n - 1)
--
-- try:
-- os.remove(
-- "tests/count_down_rec.profile"
-- ) # remove the one from prev test, if available
-- except:
-- pass
--
-- try:
-- count_down_rec(4)
-- except:
-- pass
-- try:
-- count_down_rec(3)
-- except:
-- pass
--
-- stats = yappi.YFuncStats("tests/count_down_rec.profile")
-- fsrec = utils.find_stat_by_name(stats, 'count_down_rec')
-- self.assertEqual(fsrec.ncall, 9)
-- self.assertEqual(fsrec.nactualcall, 2)
--
-- def test_strip_dirs(self):
--
-- def a():
-- pass
--
-- stats = utils.run_and_get_func_stats(a, )
-- stats.strip_dirs()
-- fsa = utils.find_stat_by_name(stats, "a")
-- self.assertEqual(fsa.module, os.path.basename(fsa.module))
--
-- @unittest.skipIf(os.name == "nt", "do not run on Windows")
-- def test_run_as_script(self):
-- import re
-- p = subprocess.Popen(
-- ['yappi', os.path.join('./tests', 'run_as_script.py')],
-- stdout=subprocess.PIPE
-- )
-- out, err = p.communicate()
-- self.assertEqual(p.returncode, 0)
-- func_stats, thread_stats = re.split(
-- b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out
-- )
-- self.assertTrue(b'FancyThread' in thread_stats)
--
-- def test_yappi_overhead(self):
-- LOOP_COUNT = 100000
--
-- def a():
-- pass
--
-- def b():
-- for i in range(LOOP_COUNT):
-- a()
--
-- t0 = time.time()
-- yappi.start()
-- b()
-- yappi.stop()
-- time_with_yappi = time.time() - t0
-- t0 = time.time()
-- b()
-- time_without_yappi = time.time() - t0
-- if time_without_yappi == 0:
-- time_without_yappi = 0.000001
--
-- # in latest v0.82, I calculated this as close to "7.0" in my machine.
-- # however, %83 of this overhead is coming from tickcount(). The other %17
-- # seems to have been evenly distributed to the internal bookkeeping
-- # structures/algorithms which seems acceptable. Note that our test only
-- # tests one function being profiled at-a-time in a short interval.
-- # profiling high number of functions in a small time
-- # is a different beast, (which is pretty unlikely in most applications)
-- # So as a conclusion: I cannot see any optimization window for Yappi that
-- # is worth implementing as we will only optimize %17 of the time.
-- sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \
-- (time_with_yappi / time_without_yappi))
--
-- def test_clear_stats_while_running(self):
--
-- def a():
-- pass
--
-- yappi.start()
-- a()
-- yappi.clear_stats()
-- a()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'a')
-- self.assertEqual(fsa.ncall, 1)
--
-- def test_generator(self):
--
-- def _gen(n):
-- while (n > 0):
-- yield n
-- n -= 1
--
-- yappi.start()
-- for x in _gen(5):
-- pass
-- self.assertTrue(
-- yappi.convert2pstats(yappi.get_func_stats()) is not None
-- )
--
-- def test_slice_child_stats_and_strip_dirs(self):
--
-- def b():
-- for i in range(10000000):
-- pass
--
-- def a():
-- b()
--
-- yappi.start(builtins=True)
-- a()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- self.assertTrue(fsa.children[0:1] is not None)
-- prev_afullname = fsa.full_name
-- prev_bchildfullname = fsa.children[fsb].full_name
-- stats.strip_dirs()
-- self.assertTrue(len(prev_afullname) > len(fsa.full_name))
-- self.assertTrue(
-- len(prev_bchildfullname) > len(fsa.children[fsb].full_name)
-- )
--
-- def test_children_stat_functions(self):
-- _timings = {"a_1": 5, "b_1": 3, "c_1": 1}
-- _yappi._set_test_timings(_timings)
--
-- def b():
-- pass
--
-- def c():
-- pass
--
-- def a():
-- b()
-- c()
--
-- yappi.start()
-- a()
-- b() # non-child call
-- c() # non-child call
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'a')
-- childs_of_a = fsa.children.get().sort("tavg", "desc")
-- prev_item = None
-- for item in childs_of_a:
-- if prev_item:
-- self.assertTrue(prev_item.tavg > item.tavg)
-- prev_item = item
-- childs_of_a.sort("name", "desc")
-- prev_item = None
-- for item in childs_of_a:
-- if prev_item:
-- self.assertTrue(prev_item.name > item.name)
-- prev_item = item
-- childs_of_a.clear()
-- self.assertTrue(childs_of_a.empty())
--
-- def test_no_stats_different_clock_type_load(self):
--
-- def a():
-- pass
--
-- yappi.start()
-- a()
-- yappi.stop()
-- yappi.get_func_stats().save("tests/ystats1.ys")
-- yappi.clear_stats()
-- yappi.set_clock_type("WALL")
-- yappi.start()
-- yappi.stop()
-- stats = yappi.get_func_stats().add("tests/ystats1.ys")
-- fsa = utils.find_stat_by_name(stats, 'a')
-- self.assertTrue(fsa is not None)
--
-- def test_subsequent_profile(self):
-- _timings = {"a_1": 1, "b_1": 1}
-- _yappi._set_test_timings(_timings)
--
-- def a():
-- pass
--
-- def b():
-- pass
--
-- yappi.start()
-- a()
-- yappi.stop()
-- yappi.start()
-- b()
-- yappi.stop()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- self.assertTrue(fsa is not None)
-- self.assertTrue(fsb is not None)
-- self.assertEqual(fsa.ttot, 1)
-- self.assertEqual(fsb.ttot, 1)
--
-- def test_lambda(self):
-- f = lambda: time.sleep(0.3)
-- yappi.set_clock_type("wall")
-- yappi.start()
-- f()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, '<lambda>')
-- self.assertTrue(fsa.ttot > 0.1)
--
-- def test_module_stress(self):
-- self.assertEqual(yappi.is_running(), False)
--
-- yappi.start()
-- yappi.clear_stats()
-- self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
--
-- yappi.stop()
-- yappi.clear_stats()
-- yappi.set_clock_type("cpu")
-- self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy")
-- self.assertEqual(yappi.is_running(), False)
-- yappi.clear_stats()
-- yappi.clear_stats()
--
-- def test_stat_sorting(self):
-- _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
-- _yappi._set_test_timings(_timings)
--
-- self._ncall = 1
--
-- def a():
-- b()
--
-- def b():
-- if self._ncall == 2:
-- return
-- self._ncall += 1
-- a()
--
-- stats = utils.run_and_get_func_stats(a)
-- stats = stats.sort("totaltime", "desc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.ttot >= stat.ttot)
-- prev_stat = stat
-- stats = stats.sort("totaltime", "asc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.ttot <= stat.ttot)
-- prev_stat = stat
-- stats = stats.sort("avgtime", "asc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.tavg <= stat.tavg)
-- prev_stat = stat
-- stats = stats.sort("name", "asc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.name <= stat.name)
-- prev_stat = stat
-- stats = stats.sort("subtime", "asc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.tsub <= stat.tsub)
-- prev_stat = stat
--
-- self.assertRaises(
-- yappi.YappiError, stats.sort, "invalid_func_sorttype_arg"
-- )
-- self.assertRaises(
-- yappi.YappiError, stats.sort, "totaltime",
-- "invalid_func_sortorder_arg"
-- )
--
-- def test_start_flags(self):
-- self.assertEqual(_yappi._get_start_flags(), None)
-- yappi.start()
--
-- def a():
-- pass
--
-- a()
-- self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
-- self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
-- self.assertEqual(len(yappi.get_thread_stats()), 1)
--
-- def test_builtin_profiling(self):
--
-- def a():
-- time.sleep(0.4) # is a builtin function
--
-- yappi.set_clock_type('wall')
--
-- yappi.start(builtins=True)
-- a()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'sleep')
-- self.assertTrue(fsa is not None)
-- self.assertTrue(fsa.ttot > 0.3)
-- yappi.stop()
-- yappi.clear_stats()
--
-- def a():
-- pass
--
-- yappi.start()
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- stats = yappi.get_func_stats()
--
-- def test_singlethread_profiling(self):
-- yappi.set_clock_type('wall')
--
-- def a():
-- time.sleep(0.2)
--
-- class Worker1(threading.Thread):
--
-- def a(self):
-- time.sleep(0.3)
--
-- def run(self):
-- self.a()
--
-- yappi.start(profile_threads=False)
--
-- c = Worker1()
-- c.start()
-- c.join()
-- a()
-- stats = yappi.get_func_stats()
-- fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
-- fsa2 = utils.find_stat_by_name(stats, 'a')
-- self.assertTrue(fsa1 is None)
-- self.assertTrue(fsa2 is not None)
-- self.assertTrue(fsa2.ttot > 0.1)
--
-- def test_run(self):
--
-- def profiled():
-- pass
--
-- yappi.clear_stats()
-- try:
-- with yappi.run():
-- profiled()
-- stats = yappi.get_func_stats()
-- finally:
-- yappi.clear_stats()
--
-- self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
--
-- def test_run_recursive(self):
--
-- def profiled():
-- pass
--
-- def not_profiled():
-- pass
--
-- yappi.clear_stats()
-- try:
-- with yappi.run():
-- with yappi.run():
-- profiled()
-- # Profiling stopped here
-- not_profiled()
-- stats = yappi.get_func_stats()
-- finally:
-- yappi.clear_stats()
--
-- self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
-- self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled'))
--
--
--class StatSaveScenarios(utils.YappiUnitTestCase):
--
-- def test_pstats_conversion(self):
--
-- def pstat_id(fs):
-- return (fs.module, fs.lineno, fs.name)
--
-- def a():
-- d()
--
-- def b():
-- d()
--
-- def c():
-- pass
--
-- def d():
-- pass
--
-- _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2}
-- _yappi._set_test_timings(_timings)
-- stats = utils.run_and_get_func_stats(a, )
-- stats.strip_dirs()
-- stats.save("tests/a1.pstats", type="pstat")
-- fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a"))
-- fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d"))
-- yappi.clear_stats()
-- _yappi._set_test_timings(_timings)
-- stats = utils.run_and_get_func_stats(a, )
-- stats.strip_dirs()
-- stats.save("tests/a2.pstats", type="pstat")
-- yappi.clear_stats()
-- _yappi._set_test_timings(_timings)
-- stats = utils.run_and_get_func_stats(b, )
-- stats.strip_dirs()
-- stats.save("tests/b1.pstats", type="pstat")
-- fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b"))
-- yappi.clear_stats()
-- _yappi._set_test_timings(_timings)
-- stats = utils.run_and_get_func_stats(c, )
-- stats.strip_dirs()
-- stats.save("tests/c1.pstats", type="pstat")
-- fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c"))
--
-- # merge saved stats and check pstats values are correct
-- import pstats
-- p = pstats.Stats(
-- 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats',
-- 'tests/c1.pstats'
-- )
-- p.strip_dirs()
-- # ct = ttot, tt = tsub
-- (cc, nc, tt, ct, callers) = p.stats[fsa_pid]
-- self.assertEqual(cc, nc, 2)
-- self.assertEqual(tt, 20)
-- self.assertEqual(ct, 24)
-- (cc, nc, tt, ct, callers) = p.stats[fsd_pid]
-- self.assertEqual(cc, nc, 3)
-- self.assertEqual(tt, 6)
-- self.assertEqual(ct, 6)
-- self.assertEqual(len(callers), 2)
-- (cc, nc, tt, ct) = callers[fsa_pid]
-- self.assertEqual(cc, nc, 2)
-- self.assertEqual(tt, 4)
-- self.assertEqual(ct, 4)
-- (cc, nc, tt, ct) = callers[fsb_pid]
-- self.assertEqual(cc, nc, 1)
-- self.assertEqual(tt, 2)
-- self.assertEqual(ct, 2)
--
-- def test_merge_stats(self):
-- _timings = {
-- "a_1": 15,
-- "b_1": 14,
-- "c_1": 12,
-- "d_1": 10,
-- "e_1": 9,
-- "f_1": 7,
-- "g_1": 6,
-- "h_1": 5,
-- "i_1": 1
-- }
-- _yappi._set_test_timings(_timings)
--
-- def a():
-- b()
--
-- def b():
-- c()
--
-- def c():
-- d()
--
-- def d():
-- e()
--
-- def e():
-- f()
--
-- def f():
-- g()
--
-- def g():
-- h()
--
-- def h():
-- i()
--
-- def i():
-- pass
--
-- yappi.start()
-- a()
-- a()
-- yappi.stop()
-- stats = yappi.get_func_stats()
-- self.assertRaises(
-- NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE"
-- )
-- stats.save("tests/ystats2.ys")
-- yappi.clear_stats()
-- _yappi._set_test_timings(_timings)
-- yappi.start()
-- a()
-- stats = yappi.get_func_stats().add("tests/ystats2.ys")
-- fsa = utils.find_stat_by_name(stats, "a")
-- fsb = utils.find_stat_by_name(stats, "b")
-- fsc = utils.find_stat_by_name(stats, "c")
-- fsd = utils.find_stat_by_name(stats, "d")
-- fse = utils.find_stat_by_name(stats, "e")
-- fsf = utils.find_stat_by_name(stats, "f")
-- fsg = utils.find_stat_by_name(stats, "g")
-- fsh = utils.find_stat_by_name(stats, "h")
-- fsi = utils.find_stat_by_name(stats, "i")
-- self.assertEqual(fsa.ttot, 45)
-- self.assertEqual(fsa.ncall, 3)
-- self.assertEqual(fsa.nactualcall, 3)
-- self.assertEqual(fsa.tsub, 3)
-- self.assertEqual(fsa.children[fsb].ttot, fsb.ttot)
-- self.assertEqual(fsa.children[fsb].tsub, fsb.tsub)
-- self.assertEqual(fsb.children[fsc].ttot, fsc.ttot)
-- self.assertEqual(fsb.children[fsc].tsub, fsc.tsub)
-- self.assertEqual(fsc.tsub, 6)
-- self.assertEqual(fsc.children[fsd].ttot, fsd.ttot)
-- self.assertEqual(fsc.children[fsd].tsub, fsd.tsub)
-- self.assertEqual(fsd.children[fse].ttot, fse.ttot)
-- self.assertEqual(fsd.children[fse].tsub, fse.tsub)
-- self.assertEqual(fse.children[fsf].ttot, fsf.ttot)
-- self.assertEqual(fse.children[fsf].tsub, fsf.tsub)
-- self.assertEqual(fsf.children[fsg].ttot, fsg.ttot)
-- self.assertEqual(fsf.children[fsg].tsub, fsg.tsub)
-- self.assertEqual(fsg.ttot, 18)
-- self.assertEqual(fsg.tsub, 3)
-- self.assertEqual(fsg.children[fsh].ttot, fsh.ttot)
-- self.assertEqual(fsg.children[fsh].tsub, fsh.tsub)
-- self.assertEqual(fsh.ttot, 15)
-- self.assertEqual(fsh.tsub, 12)
-- self.assertEqual(fsh.tavg, 5)
-- self.assertEqual(fsh.children[fsi].ttot, fsi.ttot)
-- self.assertEqual(fsh.children[fsi].tsub, fsi.tsub)
-- #stats.debug_print()
--
-- def test_merge_multithreaded_stats(self):
-- import _yappi
-- timings = {"a_1": 2, "b_1": 1}
-- _yappi._set_test_timings(timings)
--
-- def a():
-- pass
--
-- def b():
-- pass
--
-- yappi.start()
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- t = threading.Thread(target=b)
-- t.start()
-- t.join()
-- yappi.get_func_stats().save("tests/ystats1.ys")
-- yappi.clear_stats()
-- _yappi._set_test_timings(timings)
-- self.assertEqual(len(yappi.get_func_stats()), 0)
-- self.assertEqual(len(yappi.get_thread_stats()), 1)
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
--
-- self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
-- self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
-- yappi.get_func_stats().save("tests/ystats2.ys")
--
-- stats = yappi.YFuncStats([
-- "tests/ystats1.ys",
-- "tests/ystats2.ys",
-- ])
-- fsa = utils.find_stat_by_name(stats, "a")
-- fsb = utils.find_stat_by_name(stats, "b")
-- self.assertEqual(fsa.ncall, 2)
-- self.assertEqual(fsb.ncall, 1)
-- self.assertEqual(fsa.tsub, fsa.ttot, 4)
-- self.assertEqual(fsb.tsub, fsb.ttot, 1)
--
-- def test_merge_load_different_clock_types(self):
-- yappi.start(builtins=True)
--
-- def a():
-- b()
--
-- def b():
-- c()
--
-- def c():
-- pass
--
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys")
-- yappi.stop()
-- yappi.clear_stats()
-- yappi.start(builtins=False)
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- yappi.get_func_stats().save("tests/ystats2.ys")
-- yappi.stop()
-- self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
-- yappi.clear_stats()
-- yappi.set_clock_type("wall")
-- yappi.start()
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- yappi.get_func_stats().save("tests/ystats3.ys")
-- self.assertRaises(
-- yappi.YappiError,
-- yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys"
-- )
-- stats = yappi.YFuncStats(["tests/ystats1.ys",
-- "tests/ystats2.ys"]).sort("name")
-- fsa = utils.find_stat_by_name(stats, "a")
-- fsb = utils.find_stat_by_name(stats, "b")
-- fsc = utils.find_stat_by_name(stats, "c")
-- self.assertEqual(fsa.ncall, 2)
-- self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall)
--
-- def test_merge_aabab_aabbc(self):
-- _timings = {
-- "a_1": 15,
-- "a_2": 14,
-- "b_1": 12,
-- "a_3": 10,
-- "b_2": 9,
-- "c_1": 4
-- }
-- _yappi._set_test_timings(_timings)
--
-- def a():
-- if self._ncall == 1:
-- self._ncall += 1
-- a()
-- elif self._ncall == 5:
-- self._ncall += 1
-- a()
-- else:
-- b()
--
-- def b():
-- if self._ncall == 2:
-- self._ncall += 1
-- a()
-- elif self._ncall == 6:
-- self._ncall += 1
-- b()
-- elif self._ncall == 7:
-- c()
-- else:
-- return
--
-- def c():
-- pass
--
-- self._ncall = 1
-- stats = utils.run_and_get_func_stats(a, )
-- stats.save("tests/ystats1.ys")
-- yappi.clear_stats()
-- _yappi._set_test_timings(_timings)
-- #stats.print_all()
--
-- self._ncall = 5
-- stats = utils.run_and_get_func_stats(a, )
-- stats.save("tests/ystats2.ys")
--
-- #stats.print_all()
--
-- def a(): # same name but another function(code object)
-- pass
--
-- yappi.start()
-- a()
-- stats = yappi.get_func_stats().add(
-- ["tests/ystats1.ys", "tests/ystats2.ys"]
-- )
-- #stats.print_all()
-- self.assertEqual(len(stats), 4)
--
-- fsa = None
-- for stat in stats:
-- if stat.name == "a" and stat.ttot == 45:
-- fsa = stat
-- break
-- self.assertTrue(fsa is not None)
--
-- self.assertEqual(fsa.ncall, 7)
-- self.assertEqual(fsa.nactualcall, 3)
-- self.assertEqual(fsa.ttot, 45)
-- self.assertEqual(fsa.tsub, 10)
-- fsb = utils.find_stat_by_name(stats, "b")
-- fsc = utils.find_stat_by_name(stats, "c")
-- self.assertEqual(fsb.ncall, 6)
-- self.assertEqual(fsb.nactualcall, 3)
-- self.assertEqual(fsb.ttot, 36)
-- self.assertEqual(fsb.tsub, 27)
-- self.assertEqual(fsb.tavg, 6)
-- self.assertEqual(fsc.ttot, 8)
-- self.assertEqual(fsc.tsub, 8)
-- self.assertEqual(fsc.tavg, 4)
-- self.assertEqual(fsc.nactualcall, fsc.ncall, 2)
--
--
--class MultithreadedScenarios(utils.YappiUnitTestCase):
--
-- def test_issue_32(self):
-- '''
-- Start yappi from different thread and we get Internal Error(15) as
-- the current_ctx_id() called while enumerating the threads in start()
-- and as it does not swap to the enumerated ThreadState* the THreadState_GetDict()
-- returns wrong object and thus sets an invalid id for the _ctx structure.
--
-- When this issue happens multiple Threads have same tid as the internal ts_ptr
-- will be same for different contexts. So, let's see if that happens
-- '''
--
-- def foo():
-- time.sleep(0.2)
--
-- def bar():
-- time.sleep(0.1)
--
-- def thread_func():
-- yappi.set_clock_type("wall")
-- yappi.start()
--
-- bar()
--
-- t = threading.Thread(target=thread_func)
-- t.start()
-- t.join()
--
-- foo()
--
-- yappi.stop()
--
-- thread_ids = set()
-- for tstat in yappi.get_thread_stats():
-- self.assertTrue(tstat.tid not in thread_ids)
-- thread_ids.add(tstat.tid)
--
-- def test_subsequent_profile(self):
-- WORKER_COUNT = 5
--
-- def a():
-- pass
--
-- def b():
-- pass
--
-- def c():
-- pass
--
-- _timings = {
-- "a_1": 3,
-- "b_1": 2,
-- "c_1": 1,
-- }
--
-- yappi.start()
--
-- def g():
-- pass
--
-- g()
-- yappi.stop()
-- yappi.clear_stats()
-- _yappi._set_test_timings(_timings)
-- yappi.start()
--
-- _dummy = []
-- for i in range(WORKER_COUNT):
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- for i in range(WORKER_COUNT):
-- t = threading.Thread(target=b)
-- t.start()
-- _dummy.append(t)
-- t.join()
-- for i in range(WORKER_COUNT):
-- t = threading.Thread(target=a)
-- t.start()
-- t.join()
-- for i in range(WORKER_COUNT):
-- t = threading.Thread(target=c)
-- t.start()
-- t.join()
-- yappi.stop()
-- yappi.start()
--
-- def f():
-- pass
--
-- f()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- fsc = utils.find_stat_by_name(stats, 'c')
-- self.assertEqual(fsa.ncall, 10)
-- self.assertEqual(fsb.ncall, 5)
-- self.assertEqual(fsc.ncall, 5)
-- self.assertEqual(fsa.ttot, fsa.tsub, 30)
-- self.assertEqual(fsb.ttot, fsb.tsub, 10)
-- self.assertEqual(fsc.ttot, fsc.tsub, 5)
--
-- # MACOSx optimizes by only creating one worker thread
-- self.assertTrue(len(yappi.get_thread_stats()) >= 2)
--
-- def test_basic(self):
-- yappi.set_clock_type('wall')
--
-- def dummy():
-- pass
--
-- def a():
-- time.sleep(0.2)
--
-- class Worker1(threading.Thread):
--
-- def a(self):
-- time.sleep(0.3)
--
-- def run(self):
-- self.a()
--
-- yappi.start(builtins=False, profile_threads=True)
--
-- c = Worker1()
-- c.start()
-- c.join()
-- a()
-- stats = yappi.get_func_stats()
-- fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
-- fsa2 = utils.find_stat_by_name(stats, 'a')
-- self.assertTrue(fsa1 is not None)
-- self.assertTrue(fsa2 is not None)
-- self.assertTrue(fsa1.ttot > 0.2)
-- self.assertTrue(fsa2.ttot > 0.1)
-- tstats = yappi.get_thread_stats()
-- self.assertEqual(len(tstats), 2)
-- tsa = utils.find_stat_by_name(tstats, 'Worker1')
-- tsm = utils.find_stat_by_name(tstats, '_MainThread')
-- dummy() # call dummy to force ctx name to be retrieved again.
-- self.assertTrue(tsa is not None)
-- # TODO: I put dummy() to fix below, remove the comments after a while.
-- self.assertTrue( # FIX: I see this fails sometimes?
-- tsm is not None,
-- 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats))))
--
-- def test_ctx_stats(self):
-- from threading import Thread
-- DUMMY_WORKER_COUNT = 5
-- yappi.start()
--
-- class DummyThread(Thread):
-- pass
--
-- def dummy():
-- pass
--
-- def dummy_worker():
-- pass
--
-- for i in range(DUMMY_WORKER_COUNT):
-- t = DummyThread(target=dummy_worker)
-- t.start()
-- t.join()
-- yappi.stop()
-- stats = yappi.get_thread_stats()
-- tsa = utils.find_stat_by_name(stats, "DummyThread")
-- self.assertTrue(tsa is not None)
-- yappi.clear_stats()
-- time.sleep(1.0)
-- _timings = {
-- "a_1": 6,
-- "b_1": 5,
-- "c_1": 3,
-- "d_1": 1,
-- "a_2": 4,
-- "b_2": 3,
-- "c_2": 2,
-- "d_2": 1
-- }
-- _yappi._set_test_timings(_timings)
--
-- class Thread1(Thread):
-- pass
--
-- class Thread2(Thread):
-- pass
--
-- def a():
-- b()
--
-- def b():
-- c()
--
-- def c():
-- d()
--
-- def d():
-- time.sleep(0.6)
--
-- yappi.set_clock_type("wall")
-- yappi.start()
-- t1 = Thread1(target=a)
-- t1.start()
-- t2 = Thread2(target=a)
-- t2.start()
-- t1.join()
-- t2.join()
-- stats = yappi.get_thread_stats()
--
-- # the fist clear_stats clears the context table?
-- tsa = utils.find_stat_by_name(stats, "DummyThread")
-- self.assertTrue(tsa is None)
--
-- tst1 = utils.find_stat_by_name(stats, "Thread1")
-- tst2 = utils.find_stat_by_name(stats, "Thread2")
-- tsmain = utils.find_stat_by_name(stats, "_MainThread")
-- dummy() # call dummy to force ctx name to be retrieved again.
-- self.assertTrue(len(stats) == 3)
-- self.assertTrue(tst1 is not None)
-- self.assertTrue(tst2 is not None)
-- # TODO: I put dummy() to fix below, remove the comments after a while.
-- self.assertTrue( # FIX: I see this fails sometimes
-- tsmain is not None,
-- 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(stats))))
-- self.assertTrue(1.0 > tst2.ttot >= 0.5)
-- self.assertTrue(1.0 > tst1.ttot >= 0.5)
--
-- # test sorting of the ctx stats
-- stats = stats.sort("totaltime", "desc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.ttot >= stat.ttot)
-- prev_stat = stat
-- stats = stats.sort("totaltime", "asc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.ttot <= stat.ttot)
-- prev_stat = stat
-- stats = stats.sort("schedcount", "desc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.sched_count >= stat.sched_count)
-- prev_stat = stat
-- stats = stats.sort("name", "desc")
-- prev_stat = None
-- for stat in stats:
-- if prev_stat:
-- self.assertTrue(prev_stat.name.lower() >= stat.name.lower())
-- prev_stat = stat
-- self.assertRaises(
-- yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg"
-- )
-- self.assertRaises(
-- yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg"
-- )
--
-- def test_ctx_stats_cpu(self):
--
-- def get_thread_name():
-- try:
-- return threading.current_thread().name
-- except AttributeError:
-- return "Anonymous"
--
-- def burn_cpu(sec):
-- t0 = yappi.get_clock_time()
-- elapsed = 0
-- while (elapsed < sec):
-- for _ in range(1000):
-- pass
-- elapsed = yappi.get_clock_time() - t0
--
-- def test():
--
-- ts = []
-- for i in (0.01, 0.05, 0.1):
-- t = threading.Thread(target=burn_cpu, args=(i, ))
-- t.name = "burn_cpu-%s" % str(i)
-- t.start()
-- ts.append(t)
-- for t in ts:
-- t.join()
--
-- yappi.set_clock_type("cpu")
-- yappi.set_context_name_callback(get_thread_name)
--
-- yappi.start()
--
-- test()
--
-- yappi.stop()
--
-- tstats = yappi.get_thread_stats()
-- r1 = '''
-- burn_cpu-0.1 3 123145356058624 0.100105 8
-- burn_cpu-0.05 2 123145361313792 0.050149 8
-- burn_cpu-0.01 1 123145356058624 0.010127 2
-- MainThread 0 4321620864 0.001632 6
-- '''
-- self.assert_ctx_stats_almost_equal(r1, tstats)
--
-- def test_producer_consumer_with_queues(self):
-- # we currently just stress yappi, no functionality test is done here.
-- yappi.start()
-- if utils.is_py3x():
-- from queue import Queue
-- else:
-- from Queue import Queue
-- from threading import Thread
-- WORKER_THREAD_COUNT = 50
-- WORK_ITEM_COUNT = 2000
--
-- def worker():
-- while True:
-- item = q.get()
-- # do the work with item
-- q.task_done()
--
-- q = Queue()
-- for i in range(WORKER_THREAD_COUNT):
-- t = Thread(target=worker)
-- t.daemon = True
-- t.start()
--
-- for item in range(WORK_ITEM_COUNT):
-- q.put(item)
-- q.join() # block until all tasks are done
-- #yappi.get_func_stats().sort("callcount").print_all()
-- yappi.stop()
--
-- def test_temporary_lock_waiting(self):
-- yappi.start()
-- _lock = threading.Lock()
--
-- def worker():
-- _lock.acquire()
-- try:
-- time.sleep(1.0)
-- finally:
-- _lock.release()
--
-- t1 = threading.Thread(target=worker)
-- t2 = threading.Thread(target=worker)
-- t1.start()
-- t2.start()
-- t1.join()
-- t2.join()
-- #yappi.get_func_stats().sort("callcount").print_all()
-- yappi.stop()
--
-- @unittest.skipIf(os.name != "posix", "requires Posix compliant OS")
-- def test_signals_with_blocking_calls(self):
-- import signal, os, time
--
-- # just to verify if signal is handled correctly and stats/yappi are not corrupted.
-- def handler(signum, frame):
-- raise Exception("Signal handler executed!")
--
-- yappi.start()
-- signal.signal(signal.SIGALRM, handler)
-- signal.alarm(1)
-- self.assertRaises(Exception, time.sleep, 2)
-- stats = yappi.get_func_stats()
-- fsh = utils.find_stat_by_name(stats, "handler")
-- self.assertTrue(fsh is not None)
--
-- @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2")
-- def test_concurrent_futures(self):
-- yappi.start()
-- from concurrent.futures import ThreadPoolExecutor
-- with ThreadPoolExecutor(max_workers=5) as executor:
-- f = executor.submit(pow, 5, 2)
-- self.assertEqual(f.result(), 25)
-- time.sleep(1.0)
-- yappi.stop()
--
-- @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2")
-- def test_barrier(self):
-- yappi.start()
-- b = threading.Barrier(2, timeout=1)
--
-- def worker():
-- try:
-- b.wait()
-- except threading.BrokenBarrierError:
-- pass
-- except Exception:
-- raise Exception("BrokenBarrierError not raised")
--
-- t1 = threading.Thread(target=worker)
-- t1.start()
-- #b.wait()
-- t1.join()
-- yappi.stop()
--
--
--class NonRecursiveFunctions(utils.YappiUnitTestCase):
--
-- def test_abcd(self):
-- _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1}
-- _yappi._set_test_timings(_timings)
--
-- def a():
-- b()
--
-- def b():
-- c()
--
-- def c():
-- d()
--
-- def d():
-- pass
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- fsc = utils.find_stat_by_name(stats, 'c')
-- fsd = utils.find_stat_by_name(stats, 'd')
-- cfsab = fsa.children[fsb]
-- cfsbc = fsb.children[fsc]
-- cfscd = fsc.children[fsd]
--
-- self.assertEqual(fsa.ttot, 6)
-- self.assertEqual(fsa.tsub, 1)
-- self.assertEqual(fsb.ttot, 5)
-- self.assertEqual(fsb.tsub, 2)
-- self.assertEqual(fsc.ttot, 3)
-- self.assertEqual(fsc.tsub, 2)
-- self.assertEqual(fsd.ttot, 1)
-- self.assertEqual(fsd.tsub, 1)
-- self.assertEqual(cfsab.ttot, 5)
-- self.assertEqual(cfsab.tsub, 2)
-- self.assertEqual(cfsbc.ttot, 3)
-- self.assertEqual(cfsbc.tsub, 2)
-- self.assertEqual(cfscd.ttot, 1)
-- self.assertEqual(cfscd.tsub, 1)
--
-- def test_stop_in_middle(self):
-- _timings = {"a_1": 6, "b_1": 4}
-- _yappi._set_test_timings(_timings)
--
-- def a():
-- b()
-- yappi.stop()
--
-- def b():
-- time.sleep(0.2)
--
-- yappi.start()
-- a()
-- stats = yappi.get_func_stats()
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
--
-- self.assertEqual(fsa.ncall, 1)
-- self.assertEqual(fsa.nactualcall, 0)
-- self.assertEqual(fsa.ttot, 0) # no call_leave called
-- self.assertEqual(fsa.tsub, 0) # no call_leave called
-- self.assertEqual(fsb.ttot, 4)
--
--
--class RecursiveFunctions(utils.YappiUnitTestCase):
--
-- def test_fibonacci(self):
--
-- def fib(n):
-- if n > 1:
-- return fib(n - 1) + fib(n - 2)
-- else:
-- return n
--
-- stats = utils.run_and_get_func_stats(fib, 22)
-- fs = utils.find_stat_by_name(stats, 'fib')
-- self.assertEqual(fs.ncall, 57313)
-- self.assertEqual(fs.ttot, fs.tsub)
--
-- def test_abcadc(self):
-- _timings = {
-- "a_1": 20,
-- "b_1": 19,
-- "c_1": 17,
-- "a_2": 13,
-- "d_1": 12,
-- "c_2": 10,
-- "a_3": 5
-- }
-- _yappi._set_test_timings(_timings)
--
-- def a(n):
-- if n == 3:
-- return
-- if n == 1 + 1:
-- d(n)
-- else:
-- b(n)
--
-- def b(n):
-- c(n)
--
-- def c(n):
-- a(n + 1)
--
-- def d(n):
-- c(n)
--
-- stats = utils.run_and_get_func_stats(a, 1)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- fsc = utils.find_stat_by_name(stats, 'c')
-- fsd = utils.find_stat_by_name(stats, 'd')
-- self.assertEqual(fsa.ncall, 3)
-- self.assertEqual(fsa.nactualcall, 1)
-- self.assertEqual(fsa.ttot, 20)
-- self.assertEqual(fsa.tsub, 7)
-- self.assertEqual(fsb.ttot, 19)
-- self.assertEqual(fsb.tsub, 2)
-- self.assertEqual(fsc.ttot, 17)
-- self.assertEqual(fsc.tsub, 9)
-- self.assertEqual(fsd.ttot, 12)
-- self.assertEqual(fsd.tsub, 2)
-- cfsca = fsc.children[fsa]
-- self.assertEqual(cfsca.nactualcall, 0)
-- self.assertEqual(cfsca.ncall, 2)
-- self.assertEqual(cfsca.ttot, 13)
-- self.assertEqual(cfsca.tsub, 6)
--
-- def test_aaaa(self):
-- _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2}
-- _yappi._set_test_timings(_timings)
--
-- def d(n):
-- if n == 3:
-- return
-- d(n + 1)
--
-- stats = utils.run_and_get_func_stats(d, 0)
-- fsd = utils.find_stat_by_name(stats, 'd')
-- self.assertEqual(fsd.ncall, 4)
-- self.assertEqual(fsd.nactualcall, 1)
-- self.assertEqual(fsd.ttot, 9)
-- self.assertEqual(fsd.tsub, 9)
-- cfsdd = fsd.children[fsd]
-- self.assertEqual(cfsdd.ttot, 7)
-- self.assertEqual(cfsdd.tsub, 7)
-- self.assertEqual(cfsdd.ncall, 3)
-- self.assertEqual(cfsdd.nactualcall, 0)
--
-- def test_abcabc(self):
-- _timings = {
-- "a_1": 20,
-- "b_1": 19,
-- "c_1": 17,
-- "a_2": 13,
-- "b_2": 11,
-- "c_2": 9,
-- "a_3": 6
-- }
-- _yappi._set_test_timings(_timings)
--
-- def a(n):
-- if n == 3:
-- return
-- else:
-- b(n)
--
-- def b(n):
-- c(n)
--
-- def c(n):
-- a(n + 1)
--
-- stats = utils.run_and_get_func_stats(a, 1)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- fsc = utils.find_stat_by_name(stats, 'c')
-- self.assertEqual(fsa.ncall, 3)
-- self.assertEqual(fsa.nactualcall, 1)
-- self.assertEqual(fsa.ttot, 20)
-- self.assertEqual(fsa.tsub, 9)
-- self.assertEqual(fsb.ttot, 19)
-- self.assertEqual(fsb.tsub, 4)
-- self.assertEqual(fsc.ttot, 17)
-- self.assertEqual(fsc.tsub, 7)
-- cfsab = fsa.children[fsb]
-- cfsbc = fsb.children[fsc]
-- cfsca = fsc.children[fsa]
-- self.assertEqual(cfsab.ttot, 19)
-- self.assertEqual(cfsab.tsub, 4)
-- self.assertEqual(cfsbc.ttot, 17)
-- self.assertEqual(cfsbc.tsub, 7)
-- self.assertEqual(cfsca.ttot, 13)
-- self.assertEqual(cfsca.tsub, 8)
--
-- def test_abcbca(self):
-- _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1}
-- _yappi._set_test_timings(_timings)
-- self._ncall = 1
--
-- def a():
-- if self._ncall == 1:
-- b()
-- else:
-- return
--
-- def b():
-- c()
--
-- def c():
-- if self._ncall == 1:
-- self._ncall += 1
-- b()
-- else:
-- a()
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- fsc = utils.find_stat_by_name(stats, 'c')
-- cfsab = fsa.children[fsb]
-- cfsbc = fsb.children[fsc]
-- cfsca = fsc.children[fsa]
-- self.assertEqual(fsa.ttot, 10)
-- self.assertEqual(fsa.tsub, 2)
-- self.assertEqual(fsb.ttot, 9)
-- self.assertEqual(fsb.tsub, 4)
-- self.assertEqual(fsc.ttot, 7)
-- self.assertEqual(fsc.tsub, 4)
-- self.assertEqual(cfsab.ttot, 9)
-- self.assertEqual(cfsab.tsub, 2)
-- self.assertEqual(cfsbc.ttot, 7)
-- self.assertEqual(cfsbc.tsub, 4)
-- self.assertEqual(cfsca.ttot, 1)
-- self.assertEqual(cfsca.tsub, 1)
-- self.assertEqual(cfsca.ncall, 1)
-- self.assertEqual(cfsca.nactualcall, 0)
--
-- def test_aabccb(self):
-- _timings = {
-- "a_1": 13,
-- "a_2": 11,
-- "b_1": 9,
-- "c_1": 5,
-- "c_2": 3,
-- "b_2": 1
-- }
-- _yappi._set_test_timings(_timings)
-- self._ncall = 1
--
-- def a():
-- if self._ncall == 1:
-- self._ncall += 1
-- a()
-- else:
-- b()
--
-- def b():
-- if self._ncall == 3:
-- return
-- else:
-- c()
--
-- def c():
-- if self._ncall == 2:
-- self._ncall += 1
-- c()
-- else:
-- b()
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- fsc = utils.find_stat_by_name(stats, 'c')
-- cfsaa = fsa.children[fsa.index]
-- cfsab = fsa.children[fsb]
-- cfsbc = fsb.children[fsc.full_name]
-- cfscc = fsc.children[fsc]
-- cfscb = fsc.children[fsb]
-- self.assertEqual(fsb.ttot, 9)
-- self.assertEqual(fsb.tsub, 5)
-- self.assertEqual(cfsbc.ttot, 5)
-- self.assertEqual(cfsbc.tsub, 2)
-- self.assertEqual(fsa.ttot, 13)
-- self.assertEqual(fsa.tsub, 4)
-- self.assertEqual(cfsab.ttot, 9)
-- self.assertEqual(cfsab.tsub, 4)
-- self.assertEqual(cfsaa.ttot, 11)
-- self.assertEqual(cfsaa.tsub, 2)
-- self.assertEqual(fsc.ttot, 5)
-- self.assertEqual(fsc.tsub, 4)
--
-- def test_abaa(self):
-- _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5}
-- _yappi._set_test_timings(_timings)
--
-- self._ncall = 1
--
-- def a():
-- if self._ncall == 1:
-- b()
-- elif self._ncall == 2:
-- self._ncall += 1
-- a()
-- else:
-- return
--
-- def b():
-- self._ncall += 1
-- a()
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- cfsaa = fsa.children[fsa]
-- cfsba = fsb.children[fsa]
-- self.assertEqual(fsb.ttot, 10)
-- self.assertEqual(fsb.tsub, 1)
-- self.assertEqual(fsa.ttot, 13)
-- self.assertEqual(fsa.tsub, 12)
-- self.assertEqual(cfsaa.ttot, 5)
-- self.assertEqual(cfsaa.tsub, 5)
-- self.assertEqual(cfsba.ttot, 9)
-- self.assertEqual(cfsba.tsub, 4)
--
-- def test_aabb(self):
-- _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5}
-- _yappi._set_test_timings(_timings)
--
-- self._ncall = 1
--
-- def a():
-- if self._ncall == 1:
-- self._ncall += 1
-- a()
-- elif self._ncall == 2:
-- b()
-- else:
-- return
--
-- def b():
-- if self._ncall == 2:
-- self._ncall += 1
-- b()
-- else:
-- return
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- cfsaa = fsa.children[fsa]
-- cfsab = fsa.children[fsb]
-- cfsbb = fsb.children[fsb]
-- self.assertEqual(fsa.ttot, 13)
-- self.assertEqual(fsa.tsub, 4)
-- self.assertEqual(fsb.ttot, 9)
-- self.assertEqual(fsb.tsub, 9)
-- self.assertEqual(cfsaa.ttot, 10)
-- self.assertEqual(cfsaa.tsub, 1)
-- self.assertEqual(cfsab.ttot, 9)
-- self.assertEqual(cfsab.tsub, 4)
-- self.assertEqual(cfsbb.ttot, 5)
-- self.assertEqual(cfsbb.tsub, 5)
--
-- def test_abbb(self):
-- _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1}
-- _yappi._set_test_timings(_timings)
--
-- self._ncall = 1
--
-- def a():
-- if self._ncall == 1:
-- b()
--
-- def b():
-- if self._ncall == 3:
-- return
-- self._ncall += 1
-- b()
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- cfsab = fsa.children[fsb]
-- cfsbb = fsb.children[fsb]
-- self.assertEqual(fsa.ttot, 13)
-- self.assertEqual(fsa.tsub, 3)
-- self.assertEqual(fsb.ttot, 10)
-- self.assertEqual(fsb.tsub, 10)
-- self.assertEqual(fsb.ncall, 3)
-- self.assertEqual(fsb.nactualcall, 1)
-- self.assertEqual(cfsab.ttot, 10)
-- self.assertEqual(cfsab.tsub, 4)
-- self.assertEqual(cfsbb.ttot, 6)
-- self.assertEqual(cfsbb.tsub, 6)
-- self.assertEqual(cfsbb.nactualcall, 0)
-- self.assertEqual(cfsbb.ncall, 2)
--
-- def test_aaab(self):
-- _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1}
-- _yappi._set_test_timings(_timings)
--
-- self._ncall = 1
--
-- def a():
-- if self._ncall == 3:
-- b()
-- return
-- self._ncall += 1
-- a()
--
-- def b():
-- return
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- cfsaa = fsa.children[fsa]
-- cfsab = fsa.children[fsb]
-- self.assertEqual(fsa.ttot, 13)
-- self.assertEqual(fsa.tsub, 12)
-- self.assertEqual(fsb.ttot, 1)
-- self.assertEqual(fsb.tsub, 1)
-- self.assertEqual(cfsaa.ttot, 10)
-- self.assertEqual(cfsaa.tsub, 9)
-- self.assertEqual(cfsab.ttot, 1)
-- self.assertEqual(cfsab.tsub, 1)
--
-- def test_abab(self):
-- _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
-- _yappi._set_test_timings(_timings)
--
-- self._ncall = 1
--
-- def a():
-- b()
--
-- def b():
-- if self._ncall == 2:
-- return
-- self._ncall += 1
-- a()
--
-- stats = utils.run_and_get_func_stats(a)
-- fsa = utils.find_stat_by_name(stats, 'a')
-- fsb = utils.find_stat_by_name(stats, 'b')
-- cfsab = fsa.children[fsb]
-- cfsba = fsb.children[fsa]
-- self.assertEqual(fsa.ttot, 13)
-- self.assertEqual(fsa.tsub, 8)
-- self.assertEqual(fsb.ttot, 10)
-- self.assertEqual(fsb.tsub, 5)
-- self.assertEqual(cfsab.ttot, 10)
-- self.assertEqual(cfsab.tsub, 5)
-- self.assertEqual(cfsab.ncall, 2)
-- self.assertEqual(cfsab.nactualcall, 1)
-- self.assertEqual(cfsba.ttot, 6)
-- self.assertEqual(cfsba.tsub, 5)
--
--
--if __name__ == '__main__':
-- # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script']
-- # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile']
-- unittest.main()
-+import os -+import sys -+import time -+import threading -+import unittest -+import yappi -+import _yappi -+import tests.utils as utils -+import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6 -+import subprocess -+ -+_counter = 0 -+ -+ -+class BasicUsage(utils.YappiUnitTestCase): -+ -+ def test_callback_function_int_return_overflow(self): -+ # this test is just here to check if any errors are generated, as the err -+ # is printed in C side, I did not include it here. THere are ways to test -+ # this deterministically, I did not bother -+ import ctypes -+ -+ def _unsigned_overflow_margin(): -+ return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1 -+ -+ def foo(): -+ pass -+ -+ #with utils.captured_output() as (out, err): -+ yappi.set_context_id_callback(_unsigned_overflow_margin) -+ yappi.set_tag_callback(_unsigned_overflow_margin) -+ yappi.start() -+ foo() -+ -+ def test_issue60(self): -+ -+ def foo(): -+ buf = bytearray() -+ buf += b't' * 200 -+ view = memoryview(buf)[10:] -+ view = view.tobytes() -+ del buf[:10] # this throws exception -+ return view -+ -+ yappi.start(builtins=True) -+ foo() -+ self.assertTrue( -+ len( -+ yappi.get_func_stats( -+ filter_callback=lambda x: yappi. -+ func_matches(x, [memoryview.tobytes]) -+ ) -+ ) > 0 -+ ) -+ yappi.stop() -+ -+ def test_issue54(self): -+ -+ def _tag_cbk(): -+ global _counter -+ _counter += 1 -+ return _counter -+ -+ def a(): -+ pass -+ -+ def b(): -+ pass -+ -+ yappi.set_tag_callback(_tag_cbk) -+ yappi.start() -+ a() -+ a() -+ a() -+ yappi.stop() -+ stats = yappi.get_func_stats() -+ self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given -+ stats = yappi.get_func_stats(tag=1) -+ -+ for i in range(1, 3): -+ stats = yappi.get_func_stats(tag=i) -+ stats = yappi.get_func_stats( -+ tag=i, filter_callback=lambda x: yappi.func_matches(x, [a]) -+ ) -+ -+ stat = stats.pop() -+ self.assertEqual(stat.ncall, 1) -+ -+ yappi.set_tag_callback(None) -+ yappi.clear_stats() -+ yappi.start() -+ b() -+ b() -+ stats = yappi.get_func_stats() -+ self.assertEqual(len(stats), 1) -+ stat = stats.pop() -+ self.assertEqual(stat.ncall, 2) -+ -+ def test_filter(self): -+ -+ def a(): -+ pass -+ -+ def b(): -+ a() -+ -+ def c(): -+ b() -+ -+ _TCOUNT = 5 -+ -+ ts = [] -+ yappi.start() -+ for i in range(_TCOUNT): -+ t = threading.Thread(target=c) -+ t.start() -+ ts.append(t) -+ -+ for t in ts: -+ t.join() -+ -+ yappi.stop() -+ -+ ctx_ids = [] -+ for tstat in yappi.get_thread_stats(): -+ if tstat.name == '_MainThread': -+ main_ctx_id = tstat.id -+ else: -+ ctx_ids.append(tstat.id) -+ -+ fstats = yappi.get_func_stats(filter={"ctx_id": 9}) -+ self.assertTrue(fstats.empty()) -+ fstats = yappi.get_func_stats( -+ filter={ -+ "ctx_id": main_ctx_id, -+ "name": "c" -+ } -+ ) # main thread -+ self.assertTrue(fstats.empty()) -+ -+ for i in ctx_ids: -+ fstats = yappi.get_func_stats( -+ filter={ -+ "ctx_id": i, -+ "name": "a", -+ "ncall": 1 -+ } -+ ) -+ self.assertEqual(fstats.pop().ncall, 1) -+ fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"}) -+ self.assertEqual(fstats.pop().ncall, 1) -+ fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"}) -+ self.assertEqual(fstats.pop().ncall, 1) -+ -+ yappi.clear_stats() -+ yappi.start(builtins=True) -+ time.sleep(0.1) -+ yappi.stop() -+ fstats = yappi.get_func_stats(filter={"module": "time"}) -+ self.assertEqual(len(fstats), 1) -+ -+ # invalid filters` -+ self.assertRaises( -+ Exception, yappi.get_func_stats, filter={'tag': "sss"} -+ ) -+ self.assertRaises( -+ Exception, yappi.get_func_stats, filter={'ctx_id': "None"} -+ ) -+ -+ def test_filter_callback(self): -+ -+ def a(): -+ time.sleep(0.1) -+ -+ def b(): -+ a() -+ -+ def c(): -+ pass -+ -+ def d(): -+ pass -+ -+ yappi.set_clock_type("wall") -+ yappi.start(builtins=True) -+ a() -+ b() -+ c() -+ d() -+ stats = yappi.get_func_stats( -+ filter_callback=lambda x: yappi.func_matches(x, [a, b]) -+ ) -+ #stats.print_all() -+ r1 = ''' -+ tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175 -+ tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197 -+ ''' -+ self.assert_traces_almost_equal(r1, stats) -+ self.assertEqual(len(stats), 2) -+ stats = yappi.get_func_stats( -+ filter_callback=lambda x: yappi. -+ module_matches(x, [sys.modules[__name__]]) -+ ) -+ r1 = ''' -+ tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065 -+ tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011 -+ tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002 -+ tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001 -+ ''' -+ self.assert_traces_almost_equal(r1, stats) -+ self.assertEqual(len(stats), 4) -+ -+ stats = yappi.get_func_stats( -+ filter_callback=lambda x: yappi.func_matches(x, [time.sleep]) -+ ) -+ self.assertEqual(len(stats), 1) -+ r1 = ''' -+ time.sleep 2 0.206804 0.220000 0.103402 -+ ''' -+ self.assert_traces_almost_equal(r1, stats) -+ -+ def test_print_formatting(self): -+ -+ def a(): -+ pass -+ -+ def b(): -+ a() -+ -+ func_cols = { -+ 1: ("name", 48), -+ 0: ("ncall", 5), -+ 2: ("tsub", 8), -+ } -+ thread_cols = { -+ 1: ("name", 48), -+ 0: ("ttot", 8), -+ } -+ -+ yappi.start() -+ a() -+ b() -+ yappi.stop() -+ fs = yappi.get_func_stats() -+ cs = fs[1].children -+ ts = yappi.get_thread_stats() -+ #fs.print_all(out=sys.stderr, columns={1:("name", 70), }) -+ #cs.print_all(out=sys.stderr, columns=func_cols) -+ #ts.print_all(out=sys.stderr, columns=thread_cols) -+ #cs.print_all(out=sys.stderr, columns={}) -+ -+ self.assertRaises( -+ yappi.YappiError, fs.print_all, columns={1: ("namee", 9)} -+ ) -+ self.assertRaises( -+ yappi.YappiError, cs.print_all, columns={1: ("dd", 0)} -+ ) -+ self.assertRaises( -+ yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)} -+ ) -+ -+ def test_get_clock(self): -+ yappi.set_clock_type('cpu') -+ self.assertEqual('cpu', yappi.get_clock_type()) -+ clock_info = yappi.get_clock_info() -+ self.assertTrue('api' in clock_info) -+ self.assertTrue('resolution' in clock_info) -+ -+ yappi.set_clock_type('wall') -+ self.assertEqual('wall', yappi.get_clock_type()) -+ -+ t0 = yappi.get_clock_time() -+ time.sleep(0.1) -+ duration = yappi.get_clock_time() - t0 -+ self.assertTrue(0.05 < duration < 0.3) -+ -+ def test_profile_decorator(self): -+ -+ def aggregate(func, stats): -+ fname = "tests/%s.profile" % (func.__name__) -+ try: -+ stats.add(fname) -+ except IOError: -+ pass -+ stats.save(fname) -+ raise Exception("messing around") -+ -+ @yappi.profile(return_callback=aggregate) -+ def a(x, y): -+ if x + y == 25: -+ raise Exception("") -+ return x + y -+ -+ def b(): -+ pass -+ -+ try: -+ os.remove( -+ "tests/a.profile" -+ ) # remove the one from prev test, if available -+ except: -+ pass -+ -+ # global profile is on to mess things up -+ yappi.start() -+ b() -+ -+ # assert functionality and call function at same time -+ try: -+ self.assertEqual(a(1, 2), 3) -+ except: -+ pass -+ try: -+ self.assertEqual(a(2, 5), 7) -+ except: -+ pass -+ try: -+ a(4, 21) -+ except: -+ pass -+ stats = yappi.get_func_stats().add("tests/a.profile") -+ fsa = utils.find_stat_by_name(stats, 'a') -+ self.assertEqual(fsa.ncall, 3) -+ self.assertEqual(len(stats), 1) # b() should be cleared out. -+ -+ @yappi.profile(return_callback=aggregate) -+ def count_down_rec(n): -+ if n == 0: -+ return -+ count_down_rec(n - 1) -+ -+ try: -+ os.remove( -+ "tests/count_down_rec.profile" -+ ) # remove the one from prev test, if available -+ except: -+ pass -+ -+ try: -+ count_down_rec(4) -+ except: -+ pass -+ try: -+ count_down_rec(3) -+ except: -+ pass -+ -+ stats = yappi.YFuncStats("tests/count_down_rec.profile") -+ fsrec = utils.find_stat_by_name(stats, 'count_down_rec') -+ self.assertEqual(fsrec.ncall, 9) -+ self.assertEqual(fsrec.nactualcall, 2) -+ -+ def test_strip_dirs(self): -+ -+ def a(): -+ pass -+ -+ stats = utils.run_and_get_func_stats(a, ) -+ stats.strip_dirs() -+ fsa = utils.find_stat_by_name(stats, "a") -+ self.assertEqual(fsa.module, os.path.basename(fsa.module)) -+ -+ @unittest.skipIf(os.name == "nt", "do not run on Windows") -+ def test_run_as_script(self): -+ import re -+ p = subprocess.Popen( -+ ['yappi', os.path.join('./tests', 'run_as_script.py')], -+ stdout=subprocess.PIPE -+ ) -+ out, err = p.communicate() -+ self.assertEqual(p.returncode, 0) -+ func_stats, thread_stats = re.split( -+ b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out -+ ) -+ self.assertTrue(b'FancyThread' in thread_stats) -+ -+ def test_yappi_overhead(self): -+ LOOP_COUNT = 100000 -+ -+ def a(): -+ pass -+ -+ def b(): -+ for i in range(LOOP_COUNT): -+ a() -+ -+ t0 = time.time() -+ yappi.start() -+ b() -+ yappi.stop() -+ time_with_yappi = time.time() - t0 -+ t0 = time.time() -+ b() -+ time_without_yappi = time.time() - t0 -+ if time_without_yappi == 0: -+ time_without_yappi = 0.000001 -+ -+ # in latest v0.82, I calculated this as close to "7.0" in my machine. -+ # however, %83 of this overhead is coming from tickcount(). The other %17 -+ # seems to have been evenly distributed to the internal bookkeeping -+ # structures/algorithms which seems acceptable. Note that our test only -+ # tests one function being profiled at-a-time in a short interval. -+ # profiling high number of functions in a small time -+ # is a different beast, (which is pretty unlikely in most applications) -+ # So as a conclusion: I cannot see any optimization window for Yappi that -+ # is worth implementing as we will only optimize %17 of the time. -+ sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \ -+ (time_with_yappi / time_without_yappi)) -+ -+ def test_clear_stats_while_running(self): -+ -+ def a(): -+ pass -+ -+ yappi.start() -+ a() -+ yappi.clear_stats() -+ a() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'a') -+ self.assertEqual(fsa.ncall, 1) -+ -+ def test_generator(self): -+ -+ def _gen(n): -+ while (n > 0): -+ yield n -+ n -= 1 -+ -+ yappi.start() -+ for x in _gen(5): -+ pass -+ self.assertTrue( -+ yappi.convert2pstats(yappi.get_func_stats()) is not None -+ ) -+ -+ def test_slice_child_stats_and_strip_dirs(self): -+ -+ def b(): -+ for i in range(10000000): -+ pass -+ -+ def a(): -+ b() -+ -+ yappi.start(builtins=True) -+ a() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ self.assertTrue(fsa.children[0:1] is not None) -+ prev_afullname = fsa.full_name -+ prev_bchildfullname = fsa.children[fsb].full_name -+ stats.strip_dirs() -+ self.assertTrue(len(prev_afullname) > len(fsa.full_name)) -+ self.assertTrue( -+ len(prev_bchildfullname) > len(fsa.children[fsb].full_name) -+ ) -+ -+ def test_children_stat_functions(self): -+ _timings = {"a_1": 5, "b_1": 3, "c_1": 1} -+ _yappi._set_test_timings(_timings) -+ -+ def b(): -+ pass -+ -+ def c(): -+ pass -+ -+ def a(): -+ b() -+ c() -+ -+ yappi.start() -+ a() -+ b() # non-child call -+ c() # non-child call -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'a') -+ childs_of_a = fsa.children.get().sort("tavg", "desc") -+ prev_item = None -+ for item in childs_of_a: -+ if prev_item: -+ self.assertTrue(prev_item.tavg > item.tavg) -+ prev_item = item -+ childs_of_a.sort("name", "desc") -+ prev_item = None -+ for item in childs_of_a: -+ if prev_item: -+ self.assertTrue(prev_item.name > item.name) -+ prev_item = item -+ childs_of_a.clear() -+ self.assertTrue(childs_of_a.empty()) -+ -+ def test_no_stats_different_clock_type_load(self): -+ -+ def a(): -+ pass -+ -+ yappi.start() -+ a() -+ yappi.stop() -+ yappi.get_func_stats().save("tests/ystats1.ys") -+ yappi.clear_stats() -+ yappi.set_clock_type("WALL") -+ yappi.start() -+ yappi.stop() -+ stats = yappi.get_func_stats().add("tests/ystats1.ys") -+ fsa = utils.find_stat_by_name(stats, 'a') -+ self.assertTrue(fsa is not None) -+ -+ def test_subsequent_profile(self): -+ _timings = {"a_1": 1, "b_1": 1} -+ _yappi._set_test_timings(_timings) -+ -+ def a(): -+ pass -+ -+ def b(): -+ pass -+ -+ yappi.start() -+ a() -+ yappi.stop() -+ yappi.start() -+ b() -+ yappi.stop() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ self.assertTrue(fsa is not None) -+ self.assertTrue(fsb is not None) -+ self.assertEqual(fsa.ttot, 1) -+ self.assertEqual(fsb.ttot, 1) -+ -+ def test_lambda(self): -+ f = lambda: time.sleep(0.3) -+ yappi.set_clock_type("wall") -+ yappi.start() -+ f() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, '<lambda>') -+ self.assertTrue(fsa.ttot > 0.1) -+ -+ def test_module_stress(self): -+ self.assertEqual(yappi.is_running(), False) -+ -+ yappi.start() -+ yappi.clear_stats() -+ self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") -+ -+ yappi.stop() -+ yappi.clear_stats() -+ yappi.set_clock_type("cpu") -+ self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy") -+ self.assertEqual(yappi.is_running(), False) -+ yappi.clear_stats() -+ yappi.clear_stats() -+ -+ def test_stat_sorting(self): -+ _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} -+ _yappi._set_test_timings(_timings) -+ -+ self._ncall = 1 -+ -+ def a(): -+ b() -+ -+ def b(): -+ if self._ncall == 2: -+ return -+ self._ncall += 1 -+ a() -+ -+ stats = utils.run_and_get_func_stats(a) -+ stats = stats.sort("totaltime", "desc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.ttot >= stat.ttot) -+ prev_stat = stat -+ stats = stats.sort("totaltime", "asc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.ttot <= stat.ttot) -+ prev_stat = stat -+ stats = stats.sort("avgtime", "asc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.tavg <= stat.tavg) -+ prev_stat = stat -+ stats = stats.sort("name", "asc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.name <= stat.name) -+ prev_stat = stat -+ stats = stats.sort("subtime", "asc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.tsub <= stat.tsub) -+ prev_stat = stat -+ -+ self.assertRaises( -+ yappi.YappiError, stats.sort, "invalid_func_sorttype_arg" -+ ) -+ self.assertRaises( -+ yappi.YappiError, stats.sort, "totaltime", -+ "invalid_func_sortorder_arg" -+ ) -+ -+ def test_start_flags(self): -+ self.assertEqual(_yappi._get_start_flags(), None) -+ yappi.start() -+ -+ def a(): -+ pass -+ -+ a() -+ self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) -+ self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) -+ self.assertEqual(len(yappi.get_thread_stats()), 1) -+ -+ def test_builtin_profiling(self): -+ -+ def a(): -+ time.sleep(0.4) # is a builtin function -+ -+ yappi.set_clock_type('wall') -+ -+ yappi.start(builtins=True) -+ a() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'sleep') -+ self.assertTrue(fsa is not None) -+ self.assertTrue(fsa.ttot > 0.3) -+ yappi.stop() -+ yappi.clear_stats() -+ -+ def a(): -+ pass -+ -+ yappi.start() -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ stats = yappi.get_func_stats() -+ -+ def test_singlethread_profiling(self): -+ yappi.set_clock_type('wall') -+ -+ def a(): -+ time.sleep(0.2) -+ -+ class Worker1(threading.Thread): -+ -+ def a(self): -+ time.sleep(0.3) -+ -+ def run(self): -+ self.a() -+ -+ yappi.start(profile_threads=False) -+ -+ c = Worker1() -+ c.start() -+ c.join() -+ a() -+ stats = yappi.get_func_stats() -+ fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') -+ fsa2 = utils.find_stat_by_name(stats, 'a') -+ self.assertTrue(fsa1 is None) -+ self.assertTrue(fsa2 is not None) -+ self.assertTrue(fsa2.ttot > 0.1) -+ -+ def test_run(self): -+ -+ def profiled(): -+ pass -+ -+ yappi.clear_stats() -+ try: -+ with yappi.run(): -+ profiled() -+ stats = yappi.get_func_stats() -+ finally: -+ yappi.clear_stats() -+ -+ self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) -+ -+ def test_run_recursive(self): -+ -+ def profiled(): -+ pass -+ -+ def not_profiled(): -+ pass -+ -+ yappi.clear_stats() -+ try: -+ with yappi.run(): -+ with yappi.run(): -+ profiled() -+ # Profiling stopped here -+ not_profiled() -+ stats = yappi.get_func_stats() -+ finally: -+ yappi.clear_stats() -+ -+ self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled')) -+ self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled')) -+ -+ -+class StatSaveScenarios(utils.YappiUnitTestCase): -+ -+ def test_pstats_conversion(self): -+ -+ def pstat_id(fs): -+ return (fs.module, fs.lineno, fs.name) -+ -+ def a(): -+ d() -+ -+ def b(): -+ d() -+ -+ def c(): -+ pass -+ -+ def d(): -+ pass -+ -+ _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2} -+ _yappi._set_test_timings(_timings) -+ stats = utils.run_and_get_func_stats(a, ) -+ stats.strip_dirs() -+ stats.save("tests/a1.pstats", type="pstat") -+ fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a")) -+ fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d")) -+ yappi.clear_stats() -+ _yappi._set_test_timings(_timings) -+ stats = utils.run_and_get_func_stats(a, ) -+ stats.strip_dirs() -+ stats.save("tests/a2.pstats", type="pstat") -+ yappi.clear_stats() -+ _yappi._set_test_timings(_timings) -+ stats = utils.run_and_get_func_stats(b, ) -+ stats.strip_dirs() -+ stats.save("tests/b1.pstats", type="pstat") -+ fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b")) -+ yappi.clear_stats() -+ _yappi._set_test_timings(_timings) -+ stats = utils.run_and_get_func_stats(c, ) -+ stats.strip_dirs() -+ stats.save("tests/c1.pstats", type="pstat") -+ fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c")) -+ -+ # merge saved stats and check pstats values are correct -+ import pstats -+ p = pstats.Stats( -+ 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats', -+ 'tests/c1.pstats' -+ ) -+ p.strip_dirs() -+ # ct = ttot, tt = tsub -+ (cc, nc, tt, ct, callers) = p.stats[fsa_pid] -+ self.assertEqual(cc, nc, 2) -+ self.assertEqual(tt, 20) -+ self.assertEqual(ct, 24) -+ (cc, nc, tt, ct, callers) = p.stats[fsd_pid] -+ self.assertEqual(cc, nc, 3) -+ self.assertEqual(tt, 6) -+ self.assertEqual(ct, 6) -+ self.assertEqual(len(callers), 2) -+ (cc, nc, tt, ct) = callers[fsa_pid] -+ self.assertEqual(cc, nc, 2) -+ self.assertEqual(tt, 4) -+ self.assertEqual(ct, 4) -+ (cc, nc, tt, ct) = callers[fsb_pid] -+ self.assertEqual(cc, nc, 1) -+ self.assertEqual(tt, 2) -+ self.assertEqual(ct, 2) -+ -+ def test_merge_stats(self): -+ _timings = { -+ "a_1": 15, -+ "b_1": 14, -+ "c_1": 12, -+ "d_1": 10, -+ "e_1": 9, -+ "f_1": 7, -+ "g_1": 6, -+ "h_1": 5, -+ "i_1": 1 -+ } -+ _yappi._set_test_timings(_timings) -+ -+ def a(): -+ b() -+ -+ def b(): -+ c() -+ -+ def c(): -+ d() -+ -+ def d(): -+ e() -+ -+ def e(): -+ f() -+ -+ def f(): -+ g() -+ -+ def g(): -+ h() -+ -+ def h(): -+ i() -+ -+ def i(): -+ pass -+ -+ yappi.start() -+ a() -+ a() -+ yappi.stop() -+ stats = yappi.get_func_stats() -+ self.assertRaises( -+ NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE" -+ ) -+ stats.save("tests/ystats2.ys") -+ yappi.clear_stats() -+ _yappi._set_test_timings(_timings) -+ yappi.start() -+ a() -+ stats = yappi.get_func_stats().add("tests/ystats2.ys") -+ fsa = utils.find_stat_by_name(stats, "a") -+ fsb = utils.find_stat_by_name(stats, "b") -+ fsc = utils.find_stat_by_name(stats, "c") -+ fsd = utils.find_stat_by_name(stats, "d") -+ fse = utils.find_stat_by_name(stats, "e") -+ fsf = utils.find_stat_by_name(stats, "f") -+ fsg = utils.find_stat_by_name(stats, "g") -+ fsh = utils.find_stat_by_name(stats, "h") -+ fsi = utils.find_stat_by_name(stats, "i") -+ self.assertEqual(fsa.ttot, 45) -+ self.assertEqual(fsa.ncall, 3) -+ self.assertEqual(fsa.nactualcall, 3) -+ self.assertEqual(fsa.tsub, 3) -+ self.assertEqual(fsa.children[fsb].ttot, fsb.ttot) -+ self.assertEqual(fsa.children[fsb].tsub, fsb.tsub) -+ self.assertEqual(fsb.children[fsc].ttot, fsc.ttot) -+ self.assertEqual(fsb.children[fsc].tsub, fsc.tsub) -+ self.assertEqual(fsc.tsub, 6) -+ self.assertEqual(fsc.children[fsd].ttot, fsd.ttot) -+ self.assertEqual(fsc.children[fsd].tsub, fsd.tsub) -+ self.assertEqual(fsd.children[fse].ttot, fse.ttot) -+ self.assertEqual(fsd.children[fse].tsub, fse.tsub) -+ self.assertEqual(fse.children[fsf].ttot, fsf.ttot) -+ self.assertEqual(fse.children[fsf].tsub, fsf.tsub) -+ self.assertEqual(fsf.children[fsg].ttot, fsg.ttot) -+ self.assertEqual(fsf.children[fsg].tsub, fsg.tsub) -+ self.assertEqual(fsg.ttot, 18) -+ self.assertEqual(fsg.tsub, 3) -+ self.assertEqual(fsg.children[fsh].ttot, fsh.ttot) -+ self.assertEqual(fsg.children[fsh].tsub, fsh.tsub) -+ self.assertEqual(fsh.ttot, 15) -+ self.assertEqual(fsh.tsub, 12) -+ self.assertEqual(fsh.tavg, 5) -+ self.assertEqual(fsh.children[fsi].ttot, fsi.ttot) -+ self.assertEqual(fsh.children[fsi].tsub, fsi.tsub) -+ #stats.debug_print() -+ -+ def test_merge_multithreaded_stats(self): -+ import _yappi -+ timings = {"a_1": 2, "b_1": 1} -+ _yappi._set_test_timings(timings) -+ -+ def a(): -+ pass -+ -+ def b(): -+ pass -+ -+ yappi.start() -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ t = threading.Thread(target=b) -+ t.start() -+ t.join() -+ yappi.get_func_stats().save("tests/ystats1.ys") -+ yappi.clear_stats() -+ _yappi._set_test_timings(timings) -+ self.assertEqual(len(yappi.get_func_stats()), 0) -+ self.assertEqual(len(yappi.get_thread_stats()), 1) -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ -+ self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0) -+ self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1) -+ yappi.get_func_stats().save("tests/ystats2.ys") -+ -+ stats = yappi.YFuncStats([ -+ "tests/ystats1.ys", -+ "tests/ystats2.ys", -+ ]) -+ fsa = utils.find_stat_by_name(stats, "a") -+ fsb = utils.find_stat_by_name(stats, "b") -+ self.assertEqual(fsa.ncall, 2) -+ self.assertEqual(fsb.ncall, 1) -+ self.assertEqual(fsa.tsub, fsa.ttot, 4) -+ self.assertEqual(fsb.tsub, fsb.ttot, 1) -+ -+ def test_merge_load_different_clock_types(self): -+ yappi.start(builtins=True) -+ -+ def a(): -+ b() -+ -+ def b(): -+ c() -+ -+ def c(): -+ pass -+ -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys") -+ yappi.stop() -+ yappi.clear_stats() -+ yappi.start(builtins=False) -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ yappi.get_func_stats().save("tests/ystats2.ys") -+ yappi.stop() -+ self.assertRaises(_yappi.error, yappi.set_clock_type, "wall") -+ yappi.clear_stats() -+ yappi.set_clock_type("wall") -+ yappi.start() -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ yappi.get_func_stats().save("tests/ystats3.ys") -+ self.assertRaises( -+ yappi.YappiError, -+ yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys" -+ ) -+ stats = yappi.YFuncStats(["tests/ystats1.ys", -+ "tests/ystats2.ys"]).sort("name") -+ fsa = utils.find_stat_by_name(stats, "a") -+ fsb = utils.find_stat_by_name(stats, "b") -+ fsc = utils.find_stat_by_name(stats, "c") -+ self.assertEqual(fsa.ncall, 2) -+ self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall) -+ -+ def test_merge_aabab_aabbc(self): -+ _timings = { -+ "a_1": 15, -+ "a_2": 14, -+ "b_1": 12, -+ "a_3": 10, -+ "b_2": 9, -+ "c_1": 4 -+ } -+ _yappi._set_test_timings(_timings) -+ -+ def a(): -+ if self._ncall == 1: -+ self._ncall += 1 -+ a() -+ elif self._ncall == 5: -+ self._ncall += 1 -+ a() -+ else: -+ b() -+ -+ def b(): -+ if self._ncall == 2: -+ self._ncall += 1 -+ a() -+ elif self._ncall == 6: -+ self._ncall += 1 -+ b() -+ elif self._ncall == 7: -+ c() -+ else: -+ return -+ -+ def c(): -+ pass -+ -+ self._ncall = 1 -+ stats = utils.run_and_get_func_stats(a, ) -+ stats.save("tests/ystats1.ys") -+ yappi.clear_stats() -+ _yappi._set_test_timings(_timings) -+ #stats.print_all() -+ -+ self._ncall = 5 -+ stats = utils.run_and_get_func_stats(a, ) -+ stats.save("tests/ystats2.ys") -+ -+ #stats.print_all() -+ -+ def a(): # same name but another function(code object) -+ pass -+ -+ yappi.start() -+ a() -+ stats = yappi.get_func_stats().add( -+ ["tests/ystats1.ys", "tests/ystats2.ys"] -+ ) -+ #stats.print_all() -+ self.assertEqual(len(stats), 4) -+ -+ fsa = None -+ for stat in stats: -+ if stat.name == "a" and stat.ttot == 45: -+ fsa = stat -+ break -+ self.assertTrue(fsa is not None) -+ -+ self.assertEqual(fsa.ncall, 7) -+ self.assertEqual(fsa.nactualcall, 3) -+ self.assertEqual(fsa.ttot, 45) -+ self.assertEqual(fsa.tsub, 10) -+ fsb = utils.find_stat_by_name(stats, "b") -+ fsc = utils.find_stat_by_name(stats, "c") -+ self.assertEqual(fsb.ncall, 6) -+ self.assertEqual(fsb.nactualcall, 3) -+ self.assertEqual(fsb.ttot, 36) -+ self.assertEqual(fsb.tsub, 27) -+ self.assertEqual(fsb.tavg, 6) -+ self.assertEqual(fsc.ttot, 8) -+ self.assertEqual(fsc.tsub, 8) -+ self.assertEqual(fsc.tavg, 4) -+ self.assertEqual(fsc.nactualcall, fsc.ncall, 2) -+ -+ -+class MultithreadedScenarios(utils.YappiUnitTestCase): -+ -+ def test_issue_32(self): -+ ''' -+ Start yappi from different thread and we get Internal Error(15) as -+ the current_ctx_id() called while enumerating the threads in start() -+ and as it does not swap to the enumerated ThreadState* the THreadState_GetDict() -+ returns wrong object and thus sets an invalid id for the _ctx structure. -+ -+ When this issue happens multiple Threads have same tid as the internal ts_ptr -+ will be same for different contexts. So, let's see if that happens -+ ''' -+ -+ def foo(): -+ time.sleep(0.2) -+ -+ def bar(): -+ time.sleep(0.1) -+ -+ def thread_func(): -+ yappi.set_clock_type("wall") -+ yappi.start() -+ -+ bar() -+ -+ t = threading.Thread(target=thread_func) -+ t.start() -+ t.join() -+ -+ foo() -+ -+ yappi.stop() -+ -+ thread_ids = set() -+ for tstat in yappi.get_thread_stats(): -+ self.assertTrue(tstat.tid not in thread_ids) -+ thread_ids.add(tstat.tid) -+ -+ def test_subsequent_profile(self): -+ WORKER_COUNT = 5 -+ -+ def a(): -+ pass -+ -+ def b(): -+ pass -+ -+ def c(): -+ pass -+ -+ _timings = { -+ "a_1": 3, -+ "b_1": 2, -+ "c_1": 1, -+ } -+ -+ yappi.start() -+ -+ def g(): -+ pass -+ -+ g() -+ yappi.stop() -+ yappi.clear_stats() -+ _yappi._set_test_timings(_timings) -+ yappi.start() -+ -+ _dummy = [] -+ for i in range(WORKER_COUNT): -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ for i in range(WORKER_COUNT): -+ t = threading.Thread(target=b) -+ t.start() -+ _dummy.append(t) -+ t.join() -+ for i in range(WORKER_COUNT): -+ t = threading.Thread(target=a) -+ t.start() -+ t.join() -+ for i in range(WORKER_COUNT): -+ t = threading.Thread(target=c) -+ t.start() -+ t.join() -+ yappi.stop() -+ yappi.start() -+ -+ def f(): -+ pass -+ -+ f() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ fsc = utils.find_stat_by_name(stats, 'c') -+ self.assertEqual(fsa.ncall, 10) -+ self.assertEqual(fsb.ncall, 5) -+ self.assertEqual(fsc.ncall, 5) -+ self.assertEqual(fsa.ttot, fsa.tsub, 30) -+ self.assertEqual(fsb.ttot, fsb.tsub, 10) -+ self.assertEqual(fsc.ttot, fsc.tsub, 5) -+ -+ # MACOSx optimizes by only creating one worker thread -+ self.assertTrue(len(yappi.get_thread_stats()) >= 2) -+ -+ def test_basic(self): -+ yappi.set_clock_type('wall') -+ -+ def dummy(): -+ pass -+ -+ def a(): -+ time.sleep(0.2) -+ -+ class Worker1(threading.Thread): -+ -+ def a(self): -+ time.sleep(0.3) -+ -+ def run(self): -+ self.a() -+ -+ yappi.start(builtins=False, profile_threads=True) -+ -+ c = Worker1() -+ c.start() -+ c.join() -+ a() -+ stats = yappi.get_func_stats() -+ fsa1 = utils.find_stat_by_name(stats, 'Worker1.a') -+ fsa2 = utils.find_stat_by_name(stats, 'a') -+ self.assertTrue(fsa1 is not None) -+ self.assertTrue(fsa2 is not None) -+ self.assertTrue(fsa1.ttot > 0.2) -+ self.assertTrue(fsa2.ttot > 0.1) -+ tstats = yappi.get_thread_stats() -+ self.assertEqual(len(tstats), 2) -+ tsa = utils.find_stat_by_name(tstats, 'Worker1') -+ tsm = utils.find_stat_by_name(tstats, '_MainThread') -+ dummy() # call dummy to force ctx name to be retrieved again. -+ self.assertTrue(tsa is not None) -+ # TODO: I put dummy() to fix below, remove the comments after a while. -+ self.assertTrue( # FIX: I see this fails sometimes? -+ tsm is not None, -+ 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats)))) -+ -+ def test_ctx_stats(self): -+ from threading import Thread -+ DUMMY_WORKER_COUNT = 5 -+ yappi.start() -+ -+ class DummyThread(Thread): -+ pass -+ -+ def dummy(): -+ pass -+ -+ def dummy_worker(): -+ pass -+ -+ for i in range(DUMMY_WORKER_COUNT): -+ t = DummyThread(target=dummy_worker) -+ t.start() -+ t.join() -+ yappi.stop() -+ stats = yappi.get_thread_stats() -+ tsa = utils.find_stat_by_name(stats, "DummyThread") -+ self.assertTrue(tsa is not None) -+ yappi.clear_stats() -+ time.sleep(1.0) -+ _timings = { -+ "a_1": 6, -+ "b_1": 5, -+ "c_1": 3, -+ "d_1": 1, -+ "a_2": 4, -+ "b_2": 3, -+ "c_2": 2, -+ "d_2": 1 -+ } -+ _yappi._set_test_timings(_timings) -+ -+ class Thread1(Thread): -+ pass -+ -+ class Thread2(Thread): -+ pass -+ -+ def a(): -+ b() -+ -+ def b(): -+ c() -+ -+ def c(): -+ d() -+ -+ def d(): -+ time.sleep(0.6) -+ -+ yappi.set_clock_type("wall") -+ yappi.start() -+ t1 = Thread1(target=a) -+ t1.start() -+ t2 = Thread2(target=a) -+ t2.start() -+ t1.join() -+ t2.join() -+ stats = yappi.get_thread_stats() -+ -+ # the fist clear_stats clears the context table? -+ tsa = utils.find_stat_by_name(stats, "DummyThread") -+ self.assertTrue(tsa is None) -+ -+ tst1 = utils.find_stat_by_name(stats, "Thread1") -+ tst2 = utils.find_stat_by_name(stats, "Thread2") -+ tsmain = utils.find_stat_by_name(stats, "_MainThread") -+ dummy() # call dummy to force ctx name to be retrieved again. -+ self.assertTrue(len(stats) == 3) -+ self.assertTrue(tst1 is not None) -+ self.assertTrue(tst2 is not None) -+ # TODO: I put dummy() to fix below, remove the comments after a while. -+ self.assertTrue( # FIX: I see this fails sometimes -+ tsmain is not None, -+ 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(stats)))) -+ self.assertTrue(1.0 > tst2.ttot >= 0.5) -+ self.assertTrue(1.0 > tst1.ttot >= 0.5) -+ -+ # test sorting of the ctx stats -+ stats = stats.sort("totaltime", "desc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.ttot >= stat.ttot) -+ prev_stat = stat -+ stats = stats.sort("totaltime", "asc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.ttot <= stat.ttot) -+ prev_stat = stat -+ stats = stats.sort("schedcount", "desc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.sched_count >= stat.sched_count) -+ prev_stat = stat -+ stats = stats.sort("name", "desc") -+ prev_stat = None -+ for stat in stats: -+ if prev_stat: -+ self.assertTrue(prev_stat.name.lower() >= stat.name.lower()) -+ prev_stat = stat -+ self.assertRaises( -+ yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg" -+ ) -+ self.assertRaises( -+ yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg" -+ ) -+ -+ def test_ctx_stats_cpu(self): -+ -+ def get_thread_name(): -+ try: -+ return threading.current_thread().name -+ except AttributeError: -+ return "Anonymous" -+ -+ def burn_cpu(sec): -+ t0 = yappi.get_clock_time() -+ elapsed = 0 -+ while (elapsed < sec): -+ for _ in range(1000): -+ pass -+ elapsed = yappi.get_clock_time() - t0 -+ -+ def test(): -+ -+ ts = [] -+ for i in (0.01, 0.05, 0.1): -+ t = threading.Thread(target=burn_cpu, args=(i, )) -+ t.name = "burn_cpu-%s" % str(i) -+ t.start() -+ ts.append(t) -+ for t in ts: -+ t.join() -+ -+ yappi.set_clock_type("cpu") -+ yappi.set_context_name_callback(get_thread_name) -+ -+ yappi.start() -+ -+ test() -+ -+ yappi.stop() -+ -+ tstats = yappi.get_thread_stats() -+ r1 = ''' -+ burn_cpu-0.1 3 123145356058624 0.100105 8 -+ burn_cpu-0.05 2 123145361313792 0.050149 8 -+ burn_cpu-0.01 1 123145356058624 0.010127 2 -+ MainThread 0 4321620864 0.001632 6 -+ ''' -+ self.assert_ctx_stats_almost_equal(r1, tstats) -+ -+ def test_producer_consumer_with_queues(self): -+ # we currently just stress yappi, no functionality test is done here. -+ yappi.start() -+ if utils.is_py3x(): -+ from queue import Queue -+ else: -+ from Queue import Queue -+ from threading import Thread -+ WORKER_THREAD_COUNT = 50 -+ WORK_ITEM_COUNT = 2000 -+ -+ def worker(): -+ while True: -+ item = q.get() -+ # do the work with item -+ q.task_done() -+ -+ q = Queue() -+ for i in range(WORKER_THREAD_COUNT): -+ t = Thread(target=worker) -+ t.daemon = True -+ t.start() -+ -+ for item in range(WORK_ITEM_COUNT): -+ q.put(item) -+ q.join() # block until all tasks are done -+ #yappi.get_func_stats().sort("callcount").print_all() -+ yappi.stop() -+ -+ def test_temporary_lock_waiting(self): -+ yappi.start() -+ _lock = threading.Lock() -+ -+ def worker(): -+ _lock.acquire() -+ try: -+ time.sleep(1.0) -+ finally: -+ _lock.release() -+ -+ t1 = threading.Thread(target=worker) -+ t2 = threading.Thread(target=worker) -+ t1.start() -+ t2.start() -+ t1.join() -+ t2.join() -+ #yappi.get_func_stats().sort("callcount").print_all() -+ yappi.stop() -+ -+ @unittest.skipIf(os.name != "posix", "requires Posix compliant OS") -+ def test_signals_with_blocking_calls(self): -+ import signal, os, time -+ -+ # just to verify if signal is handled correctly and stats/yappi are not corrupted. -+ def handler(signum, frame): -+ raise Exception("Signal handler executed!") -+ -+ yappi.start() -+ signal.signal(signal.SIGALRM, handler) -+ signal.alarm(1) -+ self.assertRaises(Exception, time.sleep, 2) -+ stats = yappi.get_func_stats() -+ fsh = utils.find_stat_by_name(stats, "handler") -+ self.assertTrue(fsh is not None) -+ -+ @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2") -+ def test_concurrent_futures(self): -+ yappi.start() -+ from concurrent.futures import ThreadPoolExecutor -+ with ThreadPoolExecutor(max_workers=5) as executor: -+ f = executor.submit(pow, 5, 2) -+ self.assertEqual(f.result(), 25) -+ time.sleep(1.0) -+ yappi.stop() -+ -+ @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2") -+ def test_barrier(self): -+ yappi.start() -+ b = threading.Barrier(2, timeout=1) -+ -+ def worker(): -+ try: -+ b.wait() -+ except threading.BrokenBarrierError: -+ pass -+ except Exception: -+ raise Exception("BrokenBarrierError not raised") -+ -+ t1 = threading.Thread(target=worker) -+ t1.start() -+ #b.wait() -+ t1.join() -+ yappi.stop() -+ -+ -+class NonRecursiveFunctions(utils.YappiUnitTestCase): -+ -+ def test_abcd(self): -+ _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1} -+ _yappi._set_test_timings(_timings) -+ -+ def a(): -+ b() -+ -+ def b(): -+ c() -+ -+ def c(): -+ d() -+ -+ def d(): -+ pass -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ fsc = utils.find_stat_by_name(stats, 'c') -+ fsd = utils.find_stat_by_name(stats, 'd') -+ cfsab = fsa.children[fsb] -+ cfsbc = fsb.children[fsc] -+ cfscd = fsc.children[fsd] -+ -+ self.assertEqual(fsa.ttot, 6) -+ self.assertEqual(fsa.tsub, 1) -+ self.assertEqual(fsb.ttot, 5) -+ self.assertEqual(fsb.tsub, 2) -+ self.assertEqual(fsc.ttot, 3) -+ self.assertEqual(fsc.tsub, 2) -+ self.assertEqual(fsd.ttot, 1) -+ self.assertEqual(fsd.tsub, 1) -+ self.assertEqual(cfsab.ttot, 5) -+ self.assertEqual(cfsab.tsub, 2) -+ self.assertEqual(cfsbc.ttot, 3) -+ self.assertEqual(cfsbc.tsub, 2) -+ self.assertEqual(cfscd.ttot, 1) -+ self.assertEqual(cfscd.tsub, 1) -+ -+ def test_stop_in_middle(self): -+ _timings = {"a_1": 6, "b_1": 4} -+ _yappi._set_test_timings(_timings) -+ -+ def a(): -+ b() -+ yappi.stop() -+ -+ def b(): -+ time.sleep(0.2) -+ -+ yappi.start() -+ a() -+ stats = yappi.get_func_stats() -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ -+ self.assertEqual(fsa.ncall, 1) -+ self.assertEqual(fsa.nactualcall, 0) -+ self.assertEqual(fsa.ttot, 0) # no call_leave called -+ self.assertEqual(fsa.tsub, 0) # no call_leave called -+ self.assertEqual(fsb.ttot, 4) -+ -+ -+class RecursiveFunctions(utils.YappiUnitTestCase): -+ -+ def test_fibonacci(self): -+ -+ def fib(n): -+ if n > 1: -+ return fib(n - 1) + fib(n - 2) -+ else: -+ return n -+ -+ stats = utils.run_and_get_func_stats(fib, 22) -+ fs = utils.find_stat_by_name(stats, 'fib') -+ self.assertEqual(fs.ncall, 57313) -+ self.assertEqual(fs.ttot, fs.tsub) -+ -+ def test_abcadc(self): -+ _timings = { -+ "a_1": 20, -+ "b_1": 19, -+ "c_1": 17, -+ "a_2": 13, -+ "d_1": 12, -+ "c_2": 10, -+ "a_3": 5 -+ } -+ _yappi._set_test_timings(_timings) -+ -+ def a(n): -+ if n == 3: -+ return -+ if n == 1 + 1: -+ d(n) -+ else: -+ b(n) -+ -+ def b(n): -+ c(n) -+ -+ def c(n): -+ a(n + 1) -+ -+ def d(n): -+ c(n) -+ -+ stats = utils.run_and_get_func_stats(a, 1) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ fsc = utils.find_stat_by_name(stats, 'c') -+ fsd = utils.find_stat_by_name(stats, 'd') -+ self.assertEqual(fsa.ncall, 3) -+ self.assertEqual(fsa.nactualcall, 1) -+ self.assertEqual(fsa.ttot, 20) -+ self.assertEqual(fsa.tsub, 7) -+ self.assertEqual(fsb.ttot, 19) -+ self.assertEqual(fsb.tsub, 2) -+ self.assertEqual(fsc.ttot, 17) -+ self.assertEqual(fsc.tsub, 9) -+ self.assertEqual(fsd.ttot, 12) -+ self.assertEqual(fsd.tsub, 2) -+ cfsca = fsc.children[fsa] -+ self.assertEqual(cfsca.nactualcall, 0) -+ self.assertEqual(cfsca.ncall, 2) -+ self.assertEqual(cfsca.ttot, 13) -+ self.assertEqual(cfsca.tsub, 6) -+ -+ def test_aaaa(self): -+ _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2} -+ _yappi._set_test_timings(_timings) -+ -+ def d(n): -+ if n == 3: -+ return -+ d(n + 1) -+ -+ stats = utils.run_and_get_func_stats(d, 0) -+ fsd = utils.find_stat_by_name(stats, 'd') -+ self.assertEqual(fsd.ncall, 4) -+ self.assertEqual(fsd.nactualcall, 1) -+ self.assertEqual(fsd.ttot, 9) -+ self.assertEqual(fsd.tsub, 9) -+ cfsdd = fsd.children[fsd] -+ self.assertEqual(cfsdd.ttot, 7) -+ self.assertEqual(cfsdd.tsub, 7) -+ self.assertEqual(cfsdd.ncall, 3) -+ self.assertEqual(cfsdd.nactualcall, 0) -+ -+ def test_abcabc(self): -+ _timings = { -+ "a_1": 20, -+ "b_1": 19, -+ "c_1": 17, -+ "a_2": 13, -+ "b_2": 11, -+ "c_2": 9, -+ "a_3": 6 -+ } -+ _yappi._set_test_timings(_timings) -+ -+ def a(n): -+ if n == 3: -+ return -+ else: -+ b(n) -+ -+ def b(n): -+ c(n) -+ -+ def c(n): -+ a(n + 1) -+ -+ stats = utils.run_and_get_func_stats(a, 1) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ fsc = utils.find_stat_by_name(stats, 'c') -+ self.assertEqual(fsa.ncall, 3) -+ self.assertEqual(fsa.nactualcall, 1) -+ self.assertEqual(fsa.ttot, 20) -+ self.assertEqual(fsa.tsub, 9) -+ self.assertEqual(fsb.ttot, 19) -+ self.assertEqual(fsb.tsub, 4) -+ self.assertEqual(fsc.ttot, 17) -+ self.assertEqual(fsc.tsub, 7) -+ cfsab = fsa.children[fsb] -+ cfsbc = fsb.children[fsc] -+ cfsca = fsc.children[fsa] -+ self.assertEqual(cfsab.ttot, 19) -+ self.assertEqual(cfsab.tsub, 4) -+ self.assertEqual(cfsbc.ttot, 17) -+ self.assertEqual(cfsbc.tsub, 7) -+ self.assertEqual(cfsca.ttot, 13) -+ self.assertEqual(cfsca.tsub, 8) -+ -+ def test_abcbca(self): -+ _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1} -+ _yappi._set_test_timings(_timings) -+ self._ncall = 1 -+ -+ def a(): -+ if self._ncall == 1: -+ b() -+ else: -+ return -+ -+ def b(): -+ c() -+ -+ def c(): -+ if self._ncall == 1: -+ self._ncall += 1 -+ b() -+ else: -+ a() -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ fsc = utils.find_stat_by_name(stats, 'c') -+ cfsab = fsa.children[fsb] -+ cfsbc = fsb.children[fsc] -+ cfsca = fsc.children[fsa] -+ self.assertEqual(fsa.ttot, 10) -+ self.assertEqual(fsa.tsub, 2) -+ self.assertEqual(fsb.ttot, 9) -+ self.assertEqual(fsb.tsub, 4) -+ self.assertEqual(fsc.ttot, 7) -+ self.assertEqual(fsc.tsub, 4) -+ self.assertEqual(cfsab.ttot, 9) -+ self.assertEqual(cfsab.tsub, 2) -+ self.assertEqual(cfsbc.ttot, 7) -+ self.assertEqual(cfsbc.tsub, 4) -+ self.assertEqual(cfsca.ttot, 1) -+ self.assertEqual(cfsca.tsub, 1) -+ self.assertEqual(cfsca.ncall, 1) -+ self.assertEqual(cfsca.nactualcall, 0) -+ -+ def test_aabccb(self): -+ _timings = { -+ "a_1": 13, -+ "a_2": 11, -+ "b_1": 9, -+ "c_1": 5, -+ "c_2": 3, -+ "b_2": 1 -+ } -+ _yappi._set_test_timings(_timings) -+ self._ncall = 1 -+ -+ def a(): -+ if self._ncall == 1: -+ self._ncall += 1 -+ a() -+ else: -+ b() -+ -+ def b(): -+ if self._ncall == 3: -+ return -+ else: -+ c() -+ -+ def c(): -+ if self._ncall == 2: -+ self._ncall += 1 -+ c() -+ else: -+ b() -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ fsc = utils.find_stat_by_name(stats, 'c') -+ cfsaa = fsa.children[fsa.index] -+ cfsab = fsa.children[fsb] -+ cfsbc = fsb.children[fsc.full_name] -+ cfscc = fsc.children[fsc] -+ cfscb = fsc.children[fsb] -+ self.assertEqual(fsb.ttot, 9) -+ self.assertEqual(fsb.tsub, 5) -+ self.assertEqual(cfsbc.ttot, 5) -+ self.assertEqual(cfsbc.tsub, 2) -+ self.assertEqual(fsa.ttot, 13) -+ self.assertEqual(fsa.tsub, 4) -+ self.assertEqual(cfsab.ttot, 9) -+ self.assertEqual(cfsab.tsub, 4) -+ self.assertEqual(cfsaa.ttot, 11) -+ self.assertEqual(cfsaa.tsub, 2) -+ self.assertEqual(fsc.ttot, 5) -+ self.assertEqual(fsc.tsub, 4) -+ -+ def test_abaa(self): -+ _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5} -+ _yappi._set_test_timings(_timings) -+ -+ self._ncall = 1 -+ -+ def a(): -+ if self._ncall == 1: -+ b() -+ elif self._ncall == 2: -+ self._ncall += 1 -+ a() -+ else: -+ return -+ -+ def b(): -+ self._ncall += 1 -+ a() -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ cfsaa = fsa.children[fsa] -+ cfsba = fsb.children[fsa] -+ self.assertEqual(fsb.ttot, 10) -+ self.assertEqual(fsb.tsub, 1) -+ self.assertEqual(fsa.ttot, 13) -+ self.assertEqual(fsa.tsub, 12) -+ self.assertEqual(cfsaa.ttot, 5) -+ self.assertEqual(cfsaa.tsub, 5) -+ self.assertEqual(cfsba.ttot, 9) -+ self.assertEqual(cfsba.tsub, 4) -+ -+ def test_aabb(self): -+ _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5} -+ _yappi._set_test_timings(_timings) -+ -+ self._ncall = 1 -+ -+ def a(): -+ if self._ncall == 1: -+ self._ncall += 1 -+ a() -+ elif self._ncall == 2: -+ b() -+ else: -+ return -+ -+ def b(): -+ if self._ncall == 2: -+ self._ncall += 1 -+ b() -+ else: -+ return -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ cfsaa = fsa.children[fsa] -+ cfsab = fsa.children[fsb] -+ cfsbb = fsb.children[fsb] -+ self.assertEqual(fsa.ttot, 13) -+ self.assertEqual(fsa.tsub, 4) -+ self.assertEqual(fsb.ttot, 9) -+ self.assertEqual(fsb.tsub, 9) -+ self.assertEqual(cfsaa.ttot, 10) -+ self.assertEqual(cfsaa.tsub, 1) -+ self.assertEqual(cfsab.ttot, 9) -+ self.assertEqual(cfsab.tsub, 4) -+ self.assertEqual(cfsbb.ttot, 5) -+ self.assertEqual(cfsbb.tsub, 5) -+ -+ def test_abbb(self): -+ _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1} -+ _yappi._set_test_timings(_timings) -+ -+ self._ncall = 1 -+ -+ def a(): -+ if self._ncall == 1: -+ b() -+ -+ def b(): -+ if self._ncall == 3: -+ return -+ self._ncall += 1 -+ b() -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ cfsab = fsa.children[fsb] -+ cfsbb = fsb.children[fsb] -+ self.assertEqual(fsa.ttot, 13) -+ self.assertEqual(fsa.tsub, 3) -+ self.assertEqual(fsb.ttot, 10) -+ self.assertEqual(fsb.tsub, 10) -+ self.assertEqual(fsb.ncall, 3) -+ self.assertEqual(fsb.nactualcall, 1) -+ self.assertEqual(cfsab.ttot, 10) -+ self.assertEqual(cfsab.tsub, 4) -+ self.assertEqual(cfsbb.ttot, 6) -+ self.assertEqual(cfsbb.tsub, 6) -+ self.assertEqual(cfsbb.nactualcall, 0) -+ self.assertEqual(cfsbb.ncall, 2) -+ -+ def test_aaab(self): -+ _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1} -+ _yappi._set_test_timings(_timings) -+ -+ self._ncall = 1 -+ -+ def a(): -+ if self._ncall == 3: -+ b() -+ return -+ self._ncall += 1 -+ a() -+ -+ def b(): -+ return -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ cfsaa = fsa.children[fsa] -+ cfsab = fsa.children[fsb] -+ self.assertEqual(fsa.ttot, 13) -+ self.assertEqual(fsa.tsub, 12) -+ self.assertEqual(fsb.ttot, 1) -+ self.assertEqual(fsb.tsub, 1) -+ self.assertEqual(cfsaa.ttot, 10) -+ self.assertEqual(cfsaa.tsub, 9) -+ self.assertEqual(cfsab.ttot, 1) -+ self.assertEqual(cfsab.tsub, 1) -+ -+ def test_abab(self): -+ _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1} -+ _yappi._set_test_timings(_timings) -+ -+ self._ncall = 1 -+ -+ def a(): -+ b() -+ -+ def b(): -+ if self._ncall == 2: -+ return -+ self._ncall += 1 -+ a() -+ -+ stats = utils.run_and_get_func_stats(a) -+ fsa = utils.find_stat_by_name(stats, 'a') -+ fsb = utils.find_stat_by_name(stats, 'b') -+ cfsab = fsa.children[fsb] -+ cfsba = fsb.children[fsa] -+ self.assertEqual(fsa.ttot, 13) -+ self.assertEqual(fsa.tsub, 8) -+ self.assertEqual(fsb.ttot, 10) -+ self.assertEqual(fsb.tsub, 5) -+ self.assertEqual(cfsab.ttot, 10) -+ self.assertEqual(cfsab.tsub, 5) -+ self.assertEqual(cfsab.ncall, 2) -+ self.assertEqual(cfsab.nactualcall, 1) -+ self.assertEqual(cfsba.ttot, 6) -+ self.assertEqual(cfsba.tsub, 5) -+ -+ -+if __name__ == '__main__': -+ # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script'] -+ # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile'] -+ unittest.main() ---- a/tests/test_gevent.py -+++ b/tests/test_gevent.py -@@ -4,7 +4,7 @@ import yappi - import gevent - from gevent.event import Event - import threading --from utils import ( -+from .utils import ( - YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io, - burn_io_gevent - ) ---- a/tests/test_hooks.py -+++ b/tests/test_hooks.py -@@ -5,7 +5,7 @@ import unittest - import time - - import yappi --import utils -+import tests.utils as utils - - - def a(): ---- a/tests/test_tags.py -+++ b/tests/test_tags.py -@@ -2,7 +2,7 @@ import unittest - import yappi - import threading - import time --from utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io -+from .utils import YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io - - - class MultiThreadTests(YappiUnitTestCase): |