-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathweb_framework.py
More file actions
224 lines (187 loc) · 7.28 KB
/
web_framework.py
File metadata and controls
224 lines (187 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""Common utilities for web framework integration"""
import datetime
import os
import os.path
import re
import time
from abc import ABC, abstractmethod
from hashlib import sha256
from tempfile import NamedTemporaryFile
from appmap._implementation import generation
from appmap._implementation.detect_enabled import DetectEnabled
from appmap._implementation.env import Env
from appmap._implementation.event import Event, ReturnEvent, _EventIds, describe_value
from appmap._implementation.recorder import Recorder, ThreadRecorder
from appmap._implementation.utils import root_relative_path, scenario_filename
class TemplateEvent(Event): # pylint: disable=too-few-public-methods
"""A special call event that records template rendering."""
__slots__ = ["receiver", "path"]
def __init__(self, path, instance=None):
super().__init__("call")
self.receiver = describe_value(instance)
self.path = root_relative_path(path)
def to_dict(self, attrs=None):
result = super().to_dict(attrs)
classlike_name = re.sub(r"\W", "", self.path.title())
result.update(
{
"defined_class": f"<templates>.{classlike_name}",
"method_id": "render",
"static": False,
}
)
return result
class TemplateHandler: # pylint: disable=too-few-public-methods
"""Patch for a template class to capture and record template
rendering (if recording is enabled).
This patch can be used with .utils.patch_class to patch any template class
which has a .render() method. Note it requires a .filename property; if
there is no such property, this handler can be subclassed first to provide it.
"""
def render(self, orig, *args, **kwargs):
"""Calls the original implementation.
If recording is enabled, adds appropriate TemplateEvent
and ReturnEvent.
"""
rec = Recorder.get_current()
if rec.get_enabled():
start = time.monotonic()
call_event = TemplateEvent(self.filename, self) # pylint: disable=no-member
Recorder.add_event(call_event)
try:
return orig(self, *args, **kwargs)
finally:
if rec.get_enabled():
Recorder.add_event(ReturnEvent(call_event.id, time.monotonic() - start))
NAME_MAX = 255 # true for most filesystems
HASH_LEN = 7 # arbitrary, but git proves it's a reasonable value
APPMAP_SUFFIX = ".appmap.json"
def name_hash(namepart):
"""Returns the hex digits of the sha256 of the os.fsencode()d namepart."""
return sha256(os.fsencode(namepart)).hexdigest()
def write_appmap(basedir, basename, contents):
"""Write an appmap file into basedir.
Adds APPMAP_SUFFIX to basename; shortens the name if necessary.
Atomically replaces existing files. Creates the basedir if required.
"""
if len(basename) > NAME_MAX - len(APPMAP_SUFFIX):
part = NAME_MAX - len(APPMAP_SUFFIX) - 1 - HASH_LEN
basename = basename[:part] + "-" + name_hash(basename[part:])[:HASH_LEN]
filename = basename + APPMAP_SUFFIX
if not basedir.exists():
basedir.mkdir(parents=True, exist_ok=True)
with NamedTemporaryFile(mode="w", dir=basedir, delete=False) as tmp:
tmp.write(contents)
os.replace(tmp.name, basedir / filename)
def create_appmap_file(
output_dir,
request_method,
request_path_info,
request_full_path,
response,
headers,
rec,
):
start_time = datetime.datetime.now()
appmap_name = (
request_method
+ " "
+ request_path_info
+ " ("
+ str(response.status_code)
+ ") - "
+ start_time.strftime("%T.%f")[:-3]
)
appmap_basename = scenario_filename(
"_".join([str(start_time.timestamp()), request_full_path])
)
appmap_file_path = os.path.join(output_dir, appmap_basename)
metadata = {
"name": appmap_name,
"timestamp": start_time.timestamp(),
"recorder": {"name": "record_requests"},
}
write_appmap(output_dir, appmap_basename, generation.dump(rec, metadata))
headers["AppMap-Name"] = os.path.abspath(appmap_name)
headers["AppMap-File-Name"] = os.path.abspath(appmap_file_path) + APPMAP_SUFFIX
class AppmapMiddleware(ABC):
def __init__(self):
self.record_url = "/_appmap/record"
@staticmethod
def should_record():
return DetectEnabled.should_enable("remote") or DetectEnabled.should_enable(
"requests"
)
def before_request_hook(self, request, request_path, recording_is_running):
if request_path == self.record_url:
return None, None, None
rec = None
start = None
call_event_id = None
if DetectEnabled.should_enable("requests"):
# a) requests
rec = ThreadRecorder()
Recorder.set_current(rec)
rec.start_recording()
# Each time an event is added for a thread_id it's also
# added to the global Recorder(). So don't add the event
# to the global Recorder() explicitly because that would
# add the event in it twice.
elif DetectEnabled.should_enable("remote") or recording_is_running:
# b) APPMAP=true, or
# c) remote, enabled by POST to /_appmap/record, which set
# recording_is_running
rec = Recorder.get_current()
if rec and rec.get_enabled():
start, call_event_id = self.before_request_main(rec, request)
return rec, start, call_event_id
@abstractmethod
def before_request_main(self, rec):
pass
def after_request_hook(
self,
request,
request_path,
recording_is_running,
request_method,
request_base_url,
response,
response_headers,
start,
call_event_id,
):
if request_path == self.record_url:
return response
if DetectEnabled.should_enable("requests"):
# a) requests
rec = Recorder.get_current()
# Each time an event is added for a thread_id it's also
# added to the global Recorder(). So don't add the event
# to the global Recorder() explicitly because that would
# add the event in it twice.
try:
if rec.get_enabled():
self.after_request_main(rec, response, start, call_event_id)
output_dir = Env.current.output_dir / "requests"
create_appmap_file(
output_dir,
request_method,
request_path,
request_base_url,
response,
response_headers,
rec,
)
finally:
rec.stop_recording()
elif DetectEnabled.should_enable("remote") or recording_is_running:
# b) APPMAP=true, or
# c) remote, enabled by POST to /_appmap/record, which set
# recording_is_running
rec = Recorder.get_current()
if rec.get_enabled():
self.after_request_main(rec, response, start, call_event_id)
return response
@abstractmethod
def after_request_main(self, rec, response, start, call_event_id):
pass