-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_case_runner.py
More file actions
324 lines (254 loc) · 16.3 KB
/
test_case_runner.py
File metadata and controls
324 lines (254 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import os
import json
import shutil
import re
from tqdm import tqdm
from bs4 import BeautifulSoup
import subprocess
class TestCaseRunner():
def __init__(self, configs, test_case_run_log_dir):
self.configs = configs
self.test_case_run_log_dir = test_case_run_log_dir
os.makedirs(self.test_case_run_log_dir, exist_ok=True)
self.cur_no_ref_log_name = None
self.cur_human_ref_log_name = None
self.cur_rag_ref_log_name = None
self.focal_file_coverage = dict() # e.g., {'Base64_1_no_ref': cov_no_ref, 'Base64_1_with_rag_ref': cov_with_rag_ref}
def run_all_test_cases(self, test_cases, is_ref):
test_case_with_log_coverage = []
# run the generated test cases
for each_test_case in tqdm(test_cases, ncols=80, desc='Running test cases'):
focal_file_path = each_test_case['focal_file_path']
generation_relative_path = each_test_case['test_case_path']
tc_path = f"{self.configs.project_with_test_workspace}/{generation_relative_path}"
tc = each_test_case['generated_test_case']
fm_name_param = each_test_case['focal_method_name'].split('::::')[1]
log_path, focal_file_coverage, fm_cov_statistic_by_jacoco = self.run_test_case_and_get_coverage(tc, tc_path, focal_file_path, fm_name_param, is_ref=is_ref)
each_test_case[f'log_path_{is_ref}'] = log_path
each_test_case[f'coverage_focal_file'] = focal_file_coverage # used for analyze_coverage_with_target_coverage()
each_test_case[f'coverage_focal_method'] = fm_cov_statistic_by_jacoco # used for analyze_coverage_with_target_focal_method()
test_case_with_log_coverage.append(each_test_case)
return test_case_with_log_coverage
def save_log_coverage(self, log_coverage, save_path):
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, 'w') as f:
json.dump(log_coverage, f, indent=4)
print(f"Saved the generated test cases' log and coverage to {save_path}")
def run_test_case(self, test_case_path, focal_file_path, is_ref):
assert is_ref in ('no_ref', 'human_ref', 'rag_ref')
test_case_relative_path = self.get_test_case_relative_path(test_case_path)
focal_method_name = focal_file_path.split('/')[-1].split('.')[0]
suffix = is_ref
index = 1
log_file_path = f'{self.test_case_run_log_dir}/{focal_method_name}_{index}_{suffix}.log'
while os.path.exists(log_file_path):
index += 1
log_file_path = f'{self.test_case_run_log_dir}/{focal_method_name}_{index}_{suffix}.log'
setattr(self, f'cur_{is_ref}_ref_log_name', f'{focal_method_name}_{index}_{suffix}')
cwd_path = test_case_path.split('/src/test/')[0]
cmd = f"cd {cwd_path} && mvn clean verify -Dtest={test_case_relative_path} -Dcheckstyle.skip=true -DskipTests=false > '{log_file_path}' 2>&1"
print(cmd)
os.system(cmd)
return log_file_path
def run_test_case_and_get_coverage(self, test_case, test_case_path, focal_file_path, focal_method_name_parameter, is_ref):
print(f'Running the test case with = {is_ref} = reference...')
# remove the folder of test cases
tc_rel_path = test_case_path.split('/src/test/')[1]
tc_base_dir = test_case_path.replace(tc_rel_path, '')
os.makedirs(os.path.dirname(test_case_path), exist_ok=True)
with open(test_case_path, 'w') as f:
f.write(test_case)
tc_run_log_path = self.run_test_case(test_case_path, focal_file_path, is_ref)
focal_file_coverage, fm_cov_statistic_by_jacoco = self.get_focal_file_coverage(focal_file_path, test_case_path, focal_method_name_parameter) # used for analyze_coverage_with_target_coverage()
focal_file_coverage = ''.join(focal_file_coverage) if focal_file_coverage is not None else None
os.remove(test_case_path)
return tc_run_log_path, focal_file_coverage, fm_cov_statistic_by_jacoco
def get_coverage_jacoco(self, test_case_path, focal_file_path, focal_method_name_parameter):
focal_file_coverage, fm_cov_statistic_by_jacoco = self.get_focal_file_coverage(focal_file_path, test_case_path, focal_method_name_parameter) # used for analyze_coverage_with_target_coverage()
focal_file_coverage = ''.join(focal_file_coverage) if focal_file_coverage is not None else None
return focal_file_coverage, fm_cov_statistic_by_jacoco
def compile_and_execute_test_case(self, test_case, test_case_path):
compile_success, execute_success = False, False
compile_log, test_log = '', ''
tc_rel_path = test_case_path.split('/src/test/')[1]
tc_base_dir = test_case_path.replace(tc_rel_path, '')
os.makedirs(os.path.dirname(test_case_path), exist_ok=True)
with open(test_case_path, 'w') as f:
f.write(test_case)
test_case_relative_path = self.get_test_case_relative_path(test_case_path)
cwd_path = test_case_path.split('/src/test/')[0]
mvn_compile_cmd = ['mvn', 'clean', f'-Dtest={test_case_relative_path}', 'test-compile', '-Dcheckstyle.skip=true']
try:
compile_result = subprocess.run(mvn_compile_cmd, cwd=cwd_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=120)
except subprocess.TimeoutExpired:
os.remove(test_case_path)
# raise TimeoutError(f"Compilation of test case {test_case_relative_path} timed out after 120 seconds.")
compile_log = f"Compilation of test case {test_case_relative_path} timed out after 120 seconds."
return compile_log, test_log, compile_success, execute_success
compile_log = f'{compile_result.stdout}\n\n{compile_result.stderr}\n\n'
if "BUILD SUCCESS" in compile_log:
compile_success = True
mvn_test_cmd = ['mvn', 'clean', 'verify', f'-Dtest={test_case_relative_path}', '-Dcheckstyle.skip=true'] # test and get the coverage
try:
test_result = subprocess.run(mvn_test_cmd, cwd=cwd_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=120)
except subprocess.TimeoutExpired:
os.remove(test_case_path)
# raise TimeoutError(f"Execution of test case {test_case_relative_path} timed out after 120 seconds.")
test_log = f"Compilation of test case {test_case_relative_path} timed out after 120 seconds."
return compile_log, test_log, compile_success, execute_success
test_log = f'{test_result.stdout}\n\n{test_result.stderr}'
if "BUILD SUCCESS" in test_log:
execute_success = True
if not os.path.exists(test_case_path):
print(f'[WARNING] file "{test_case_path}" does not exist. The log is shown as follows:')
print(f'{compile_log}\n\n')
print(f'{test_log}\n\n')
else:
os.remove(test_case_path)
return compile_log, test_log, compile_success, execute_success
def get_test_case_relative_path(self, test_case_path):
test_case_relative_path = test_case_path.split('/src/test/java/')[1]
test_case_relative_path = test_case_relative_path.split('/')[1:]
test_case_relative_path = '/'.join(test_case_relative_path)
test_case_relative_path = test_case_relative_path.replace('.java', '')
test_case_relative_path = test_case_relative_path.replace('/', '.')
return test_case_relative_path
def get_focal_file_coverage(self, focal_file_path, test_case_path, focal_method_name_parameter):
# base_path = f'{self.configs.project_with_test_workspace}/{self.configs.project_name}'
base_path = test_case_path.split('/src/test/java/')[0]
org_name = test_case_path.split('/src/test/java/')[1].split('/')[0]
test_suffix = 'Test'
test_case_relative_path = self.get_test_case_relative_path(test_case_path)
# jacoco java.html report contains java code lines with tags.
jacoco_java_html_report_path = self.get_jacoco_java_html_report_path(base_path, test_case_relative_path, org_name, test_suffix)
if not os.path.exists(jacoco_java_html_report_path):
print(f'[WARNING] Jacoco report not found: {jacoco_java_html_report_path}')
return None, None
# will be used for analyze_coverage_with_target_coverage(). will be used to count the target coverage's coverage
cov_lines, uncov_lines = self.get_lines_coverage(jacoco_java_html_report_path)
with open(f'{self.configs.project_with_test_workspace}/{focal_file_path}', 'r') as f:
focal_file = f.readlines()
for line in cov_lines:
if focal_file[line - 1].strip() != '}':
focal_file[line - 1] = "<COVER>" + focal_file[line - 1]
# will be used for analyze_coverage_with_target_focal_method(). directly use the focal method's coverage counted by jacoco
jacoco_html_report_path = jacoco_java_html_report_path.replace('.java.html', '.html')
if not os.path.exists(jacoco_html_report_path):
print(f'[WARNING] Jacoco report not found: {jacoco_html_report_path}. But Jacoco java.html report is found: {jacoco_java_html_report_path}')
return None, None
#
fm_cov_statistic_by_jacoco = self.get_focal_method_coverage_statistic_by_jacoco(focal_method_name_parameter, jacoco_html_report_path)
return focal_file, fm_cov_statistic_by_jacoco
def get_jacoco_java_html_report_path(self, base_path, test_class_name, org_name, test_suffix):
# get jacoco report
# append_path = "spark/" if '.' not in test_class_name else "spark." + '.'.join(test_class_name.split(".")[:-1]) + '/'
append_path = org_name + "/" if '.' not in test_class_name else org_name + "." + '.'.join(test_class_name.split(".")[:-1]) + '/'
suff_len = len(test_suffix)
html_name = test_class_name.split(".")[-1][:suff_len * -1] + ".java.html" # changes from -4 to -5 depending on whether it's Test or Tests
jacoco_path = base_path + "/target/site/jacoco/" + append_path + html_name
return jacoco_path
def get_lines_coverage(self, jacoco_java_html_report_path):
with open(jacoco_java_html_report_path) as f:
soup = BeautifulSoup(f, 'html.parser')
# find all spans with class 'fc' or 'pc' or 'bpc', and extract the ID
cov_lines = []
uncov_lines = []
for span in soup.find_all('span', class_=['fc', 'pc', 'bpc', 'nc']):
if span['class'][0] == 'nc':
uncov_lines.append(int(span['id'][1:]))
else:
cov_lines.append(int(span['id'][1:]))
return cov_lines, uncov_lines
def get_focal_method_coverage_statistic_by_jacoco(self, focal_method_name_param, jacoco_html_report_path):
with open(jacoco_html_report_path) as f:
soup = BeautifulSoup(f, 'html.parser')
# example: focal_method_name_param is intersectionDistinct(java.util.Collection<T>,java.util.Collection<T>,java.util.Collection<T>[]). to match intersectionDistinct(Collection, Collection, Collection[])
# example: valuesOfKeys(java.util.Map<K, V>,K[]) to match valuesOfKeys(Map, Object[])
# example: groupingBy(java.util.function.Function<? super T, ? extends K>,java.util.function.Function<? super T, ? extends R>) to match groupingBy(Function, Function)
# example: filter(java.util.Map<K, V>,cn.hutool.core.lang.Filter<java.util.Map.Entry<K, V>>) to match filter(Map, Filter)
target_fm_name = focal_method_name_param.strip().split('(')[0]
target_fm_params_str = focal_method_name_param.strip().split('(')[1][:-1]
target_fm_params_str = self.remove_angle_brackets_substrings(target_fm_params_str)
target_fm_params = [each_param.strip() for each_param in target_fm_params_str.split(',')]
target_fm_params = [each_param.split('.')[-1] if '.' in each_param else each_param for each_param in target_fm_params]
cov_stat = {}
candidates = []
table = soup.find('tbody')
for each_tr in table.find_all('tr'):
# check the method name
candidate_all_column = each_tr.find_all('td')
method_name = candidate_all_column[0].text
candidate_fm_name = method_name.strip().split('(')[0]
if candidate_fm_name != target_fm_name:
continue
candidates.append(candidate_all_column)
if len(candidates) == 0:
print('[WARNING] Cannot find the focal method in the jacoco report. Need manual check\n', f'focal_method_name: {focal_method_name_param}\n\n')
cov_stat['raw_html'] = str(soup)
return cov_stat
if len(candidates) > 1:
all_column = self.select_focal_method_coverage_statistic_by_jacoco(target_fm_params, candidates)
else:
all_column = candidates[0]
if all_column is not None:
# parse the result
cov_stat['number_of_lines'] = int(all_column[8].text.strip())
cov_stat['number_of_branches'] = int(all_column[6].text.strip()) - 1
cov_stat['line_coverage'] = float(all_column[2].text.strip()[:-1]) # remove the '%'
branch_cov = all_column[4].text.strip()
cov_stat['branch_coverage'] = float(branch_cov[:-1]) if branch_cov != 'n/a' else branch_cov
else:
print('[WARNING] Cannot find the focal method in the jacoco report. Need manual check\n', f'focal_method_name: {focal_method_name_param}\n\n')
cov_stat['raw_html'] = str(soup)
return cov_stat
def select_focal_method_coverage_statistic_by_jacoco(self, target_fm_params, candidates):
# filter according to the number of parameters
filter_candidates = []
for each_candidate in candidates:
method_name = each_candidate[0].text
candidate_fm_params = [each_param.strip() for each_param in method_name.strip().split('(')[1][:-1].split(',')]
if len(target_fm_params) == len(candidate_fm_params):
filter_candidates.append(each_candidate)
if len(filter_candidates) == 1:
all_column = filter_candidates[0]
return all_column
# filter according to the detailed parameters
all_column = None
for each_candidate in filter_candidates:
method_name = each_candidate[0].text
candidate_fm_params = [each_param.strip() for each_param in method_name.strip().split('(')[1][:-1].split(',')]
is_match = True
for idx in range(len(target_fm_params)):
if target_fm_params[idx] != candidate_fm_params[idx]:
is_match = False
break
if is_match:
all_column = each_candidate
return all_column
# for corner case such as: valuesOfKeys(java.util.Map<K, V>,K[]) to match valuesOfKeys(Map, Object[]). need to transform K to Object
if all_column is None:
for each_candidate in filter_candidates:
method_name = each_candidate[0].text
candidate_fm_params = [each_param.strip() for each_param in method_name.strip().split('(')[1][:-1].split(',')]
is_match = True
for idx in range(len(target_fm_params)):
if target_fm_params[idx] != candidate_fm_params[idx]:
change_to_object = re.sub(r'[A-Za-z]', 'Object', target_fm_params[idx])
if change_to_object == candidate_fm_params[idx]: # add this to check the corner case
continue
else:
is_match = False
break
if is_match:
all_column = each_candidate
break
return all_column
def remove_angle_brackets_substrings(self, input_string):
# Define the regular expression pattern to match substrings within angle brackets, including nested ones
pattern = re.compile(r"<[^<>]*>")
while True:
# Remove all substrings that match the pattern
input_string, count = pattern.subn('', input_string)
if count == 0:
break
return input_string