forked from ionelmc/python-hunter
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_cookbook.py
More file actions
180 lines (142 loc) · 6.14 KB
/
test_cookbook.py
File metadata and controls
180 lines (142 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import contextlib
import functools
import os
from logging import getLogger
from time import time
import aspectlib
import pytest
import hunter
from hunter.actions import RETURN_VALUE
from hunter.actions import ColorStreamAction
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
logger = getLogger(__name__)
pytest_plugins = 'pytester',
def nothin(x):
return x
def bar():
baz()
@nothin
@nothin
@nothin
@nothin
def baz():
for i in range(10):
os.path.join('a', str(i))
foo = 1
def brief_probe(qualname, *actions, **kwargs):
return aspectlib.weave(qualname, functools.partial(hunter.wrap, actions=actions, **kwargs))
def fast_probe(qualname, *actions, **filters):
def tracing_decorator(func):
@functools.wraps(func)
def tracing_wrapper(*args, **kwargs):
# create the Tracer manually to avoid spending time in likely useless things like:
# - loading PYTHONHUNTERCONFIG
# - setting up the clear_env_var or thread_support options
# - atexit cleanup registration
with hunter.Tracer().trace(hunter.When(hunter.Query(**filters), *actions)):
return func(*args, **kwargs)
return tracing_wrapper
return aspectlib.weave(qualname, tracing_decorator) # this does the monkeypatch
@contextlib.contextmanager
def no_probe(*args, **kwargs):
yield
@pytest.mark.parametrize('impl', [fast_probe, brief_probe, no_probe])
def test_probe(impl, benchmark):
with impl('%s.baz' % __name__, hunter.VarsPrinter('foo', stream=open(os.devnull, 'w')), kind='return', depth=0):
benchmark(bar)
class ProfileAction(ColorStreamAction):
# using ColorStreamAction brings this more in line with the other actions
# (stream option, coloring and such, see the other examples for colors)
def __init__(self, **kwargs):
self.timings = {}
super(ProfileAction, self).__init__(**kwargs)
def __call__(self, event):
current_time = time()
# include event.builtin in the id so we don't have problems
# with Python reusing frame objects from the previous call for builtin calls
frame_id = id(event.frame), str(event.builtin)
if event.kind == 'call':
self.timings[frame_id] = current_time, None
elif frame_id in self.timings:
start_time, exception = self.timings.pop(frame_id)
# try to find a complete function name for display
function_object = event.function_object
if event.builtin:
function = '<builtin>.{}'.format(event.arg.__name__)
elif function_object:
if hasattr(function_object, '__qualname__'):
function = '{}.{}'.format(
function_object.__module__, function_object.__qualname__
)
else:
function = '{}.{}'.format(
function_object.__module__,
function_object.__name__
)
else:
function = event.function
if event.kind == 'exception':
# store the exception
# (there will be a followup 'return' event in which we deal with it)
self.timings[frame_id] = start_time, event.arg
elif event.kind == 'return':
delta = current_time - start_time
if event.instruction == RETURN_VALUE:
# exception was discarded
self.output(
'{fore(BLUE)}{} returned: {}. Duration: {:.4f}s{RESET}\n',
function, event.arg, delta
)
else:
self.output(
'{fore(RED)}{} raised exception: {}. Duration: {:.4f}s{RESET}\n',
function, exception, delta
)
@pytest.mark.parametrize('options', [
{'kind__in': ['call', 'return', 'exception']},
{'profile': True}
], ids=['kind__in=call,return,exception', 'profile=True'])
def test_profile(LineMatcher, options):
stream = StringIO()
with hunter.trace(action=ProfileAction(stream=stream), **options):
from sample8errors import notsilenced
from sample8errors import silenced1
from sample8errors import silenced3
from sample8errors import silenced4
silenced1()
print('Done silenced1')
silenced3()
print('Done silenced3')
silenced4()
print('Done silenced4')
try:
notsilenced()
except ValueError:
print('Done not silenced')
lm = LineMatcher(stream.getvalue().splitlines())
if 'profile' in options:
lm.fnmatch_lines([
"sample8errors.error raised exception: None. Duration: ?.????s",
"sample8errors.silenced1 returned: None. Duration: ?.????s",
"sample8errors.error raised exception: None. Duration: ?.????s",
"sample8errors.silenced3 returned: mwhahaha. Duration: ?.????s",
"sample8errors.error raised exception: None. Duration: ?.????s",
"<builtin>.repr raised exception: None. Duration: ?.????s",
"sample8errors.silenced4 returned: None. Duration: ?.????s",
"sample8errors.error raised exception: None. Duration: ?.????s",
"sample8errors.notsilenced raised exception: None. Duration: ?.????s",
])
else:
lm.fnmatch_lines([
"sample8errors.error raised exception: (*RuntimeError*, *). Duration: ?.????s",
"sample8errors.silenced1 returned: None. Duration: ?.????s",
"sample8errors.error raised exception: (*RuntimeError*, *). Duration: ?.????s",
"sample8errors.silenced3 returned: mwhahaha. Duration: ?.????s",
"sample8errors.error raised exception: (*RuntimeError*, *). Duration: ?.????s",
"sample8errors.silenced4 returned: None. Duration: ?.????s",
"sample8errors.error raised exception: (*RuntimeError*, *). Duration: ?.????s",
"sample8errors.notsilenced raised exception: (*ValueError(RuntimeError*, *). Duration: ?.????s",
])