forked from ionelmc/python-hunter
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_tracer.pyx
More file actions
149 lines (127 loc) · 5.06 KB
/
_tracer.pyx
File metadata and controls
149 lines (127 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# cython: linetrace=True, language_level=3str
import threading
import traceback
from cpython cimport pystate
from cpython.pystate cimport PyThreadState_Get
from cpython.ref cimport Py_CLEAR
from cpython.ref cimport Py_INCREF
from ._event cimport Event
from ._predicates cimport fast_call
import hunter
__all__ = 'Tracer',
cdef dict KIND_INTS = {
'call': 0,
'exception': 1,
'line': 2,
'return': 3,
'c_call': 4,
'c_exception': 5,
'c_return': 6,
}
cdef int trace_func(Tracer self, FrameType frame, int kind, PyObject *arg) except -1:
if frame.f_trace is not <PyObject*> self:
Py_CLEAR(frame.f_trace)
Py_INCREF(self)
frame.f_trace = <PyObject*> self
handler = self.handler
if handler is None: # the tracer was stopped
# make sure it's uninstalled even for running threads
if self.profiling_mode:
PyEval_SetProfile(NULL, NULL)
else:
PyEval_SetTrace(NULL, NULL)
return 0
if kind == 3 and self.depth > 0:
self.depth -= 1
cdef Event event = Event(frame, kind, None if arg is NULL else <object>arg, self)
try:
fast_call(handler, event)
except Exception as exc:
traceback.print_exc(file=hunter._default_stream)
hunter._default_stream.write('Disabling tracer because handler %r failed (%r) at %r.\n\n' % (
handler, exc, event))
self.stop()
return 0
if kind == 0:
self.depth += 1
self.calls += 1
cdef class Tracer:
def __cinit__(self, threading_support=None, profiling_mode=False):
self.handler = None
self.previous = None
self._previousfunc = NULL
self._threading_previous = None
self.threading_support = threading_support
self.profiling_mode = profiling_mode
self.depth = 0
self.calls = 0
def __dealloc__(self):
cdef PyThreadState *state = PyThreadState_Get()
if state.c_traceobj is <PyObject *>self:
self.stop()
def __repr__(self):
return '<hunter._tracer.Tracer at 0x%x: threading_support=%s, %s%s%s%s>' % (
id(self),
self.threading_support,
'<stopped>' if self.handler is None else 'handler=',
'' if self.handler is None else repr(self.handler),
'' if self.previous is None else ', previous=',
'' if self.previous is None else repr(self.previous),
)
def __call__(self, FrameType frame, str kind, arg):
trace_func(self, frame, KIND_INTS[kind], <PyObject *> arg)
if kind == 0:
PyEval_SetTrace(<pystate.Py_tracefunc> trace_func, <PyObject *> self)
return self
def trace(self, predicate):
self.handler = predicate
cdef PyThreadState *state = PyThreadState_Get()
if self.profiling_mode:
if self.threading_support is None or self.threading_support:
self._threading_previous = getattr(threading, '_profile_hook', None)
threading.setprofile(self)
if state.c_profileobj is NULL:
self.previous = None
self._previousfunc = NULL
else:
self.previous = <object>(state.c_profileobj)
self._previousfunc = state.c_profilefunc
PyEval_SetProfile(<pystate.Py_tracefunc> trace_func, <PyObject *> self)
else:
if self.threading_support is None or self.threading_support:
self._threading_previous = getattr(threading, '_trace_hook', None)
threading.settrace(self)
if state.c_traceobj is NULL:
self.previous = None
self._previousfunc = NULL
else:
self.previous = <object>(state.c_traceobj)
self._previousfunc = state.c_tracefunc
PyEval_SetTrace(<pystate.Py_tracefunc> trace_func, <PyObject *> self)
return self
def stop(self):
if self.handler is not None:
if self.profiling_mode:
if self.previous is None:
PyEval_SetProfile(NULL, NULL)
else:
PyEval_SetProfile(self._previousfunc, <PyObject *> self.previous)
self.handler = self.previous = None
self._previousfunc = NULL
if self.threading_support is None or self.threading_support:
threading.setprofile(self._threading_previous)
self._threading_previous = None
else:
if self.previous is None:
PyEval_SetTrace(NULL, NULL)
else:
PyEval_SetTrace(self._previousfunc, <PyObject *> self.previous)
self.handler = self.previous = None
self._previousfunc = NULL
if self.threading_support is None or self.threading_support:
threading.settrace(self._threading_previous)
self._threading_previous = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()