-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPipeline.py
More file actions
297 lines (247 loc) · 11.7 KB
/
Pipeline.py
File metadata and controls
297 lines (247 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import os
from multiprocessing import Pool
import pandas as pd
from pyecore.resources import ResourceSet, URI
from AstToEcoreConverter import ProjectEcoreGraph
from DataformatUtils import convert_edge_dim, convert_list_to_float_tensor, convert_list_to_long_tensor, \
convert_hashed_names_to_float
from EcoreToMatrixConverter import EcoreToMatrixConverter
'''in this file are the pipeline components put into reusable functions'''
ecore_graph = None
node_features = None
adj_list = None
edge_attr = None
repo_multiprocess = None
edge_attribute = None
def create_output_folders(directory):
"""
Create a directory structure for output files.
Ensures the main directory exists and creates subdirectories `xmi_files`
and `csv_files` for organizing output files.
Args:
directory (str): Path to the main output directory.
Raises:
OSError: If the directory cannot be created.
"""
if not os.path.exists(directory):
os.makedirs(directory)
# create sub folders for converter output and dataset
if not os.path.exists(f'{directory}/xmi_files'):
os.makedirs(f'{directory}/xmi_files')
if not os.path.exists(f'{directory}/csv_files'):
os.makedirs(f'{directory}/csv_files')
def download_repositories(repository_directory, repository_list):
"""
Clone Git repositories listed in an Excel/ODS file into a specified directory.
Args:
repository_directory: Directory where repositories will be cloned.
repository_list: Path to the Excel/ODS file containing repository URLs
with a header named 'html_url'.
Raises:
ValueError: If the 'html_url' header is missing in the input file.
OSError: If the directory cannot be created or if cloning fails.
"""
working_directory = os.getcwd()
# load labeled repository from Excel/ods file
# requirements for format: no empty rows in between and header name html_url
resource = pd.read_excel(repository_list)
# create directory for cloning if it does not exist and set it as current working directory
if not os.path.exists(repository_directory):
os.makedirs(repository_directory)
os.chdir(repository_directory)
# retrieve urls and clone repositories
for row in resource.iterrows():
row_data = row[1]
url = row_data.get('html_url')
os.system(f'git clone {url}')
# change working directory back to github-classifier, otherwise cannot load resources from there and run tool
os.chdir(working_directory)
def create_ecore_graphs(repository, write_in_file, output_directory=None):
"""
Convert a repository into an Ecore graph.
This function attempts to create an Ecore graph from the specified repository.
If the repository contains inconsistent indentation (mixing tabs and spaces),
it will automatically format the Python files using `autopep8` and retry the graph creation.
Args:
repository: The path to the repository to be converted into an Ecore graph.
write_in_file: A boolean indicating whether to write the graph to a file.
output_directory: Optional; the directory where the output file will be written.
Returns:
The generated Ecore graph if `write_in_file` is False; otherwise, returns None.
Raises:
Exception: Any exceptions raised during the graph creation process will be printed,
and the function will skip the problematic repository.
"""
if not os.path.isdir(repository):
print(f'Problem with repository {repository}. Skipping.')
return None
resource_set = ResourceSet()
try:
ecore_graph = ProjectEcoreGraph(
resource_set, repository, write_in_file, output_directory)
if write_in_file is False:
return ecore_graph.get_graph()
else:
return None
except Exception as e:
print(e)
if 'inconsistent use of tabs and spaces in indentation' in str(e):
# format repository files using autopep8
python_files = [os.path.join(root, file) for root, _, files in os.walk(
repository) for file in files if file.endswith('.py')]
for file_path in python_files:
os.system(f'autopep8 --in-place {file_path}')
try:
ecore_graph = ProjectEcoreGraph(
resource_set, repository, write_in_file, output_directory)
if write_in_file is False:
return ecore_graph.get_graph()
else:
return None
except Exception as e:
print(e)
print(f'Problem with repository {repository}. Skipping.')
return None
else:
print(f'Problem with repository {repository}. Skipping.')
return None
def create_matrix_structure(write_in_file, xmi_file=None, local_ecore_graph=None, output_directory=None):
"""
Convert an Ecore graph or XMI file into three matrices: node features, adjacency list,
and edge attributes.
This function can either write the matrices to files or return them as output.
If writing to files, it expects an XMI file to be provided; otherwise, it will
use the provided Ecore graph.
Args:
write_in_file: A boolean indicating whether to write the matrices to files.
xmi_file: Optional; the name of the XMI file to be processed.
local_ecore_graph: Optional; the Ecore graph to be converted into matrices.
output_directory: The directory where output files will be written.
Returns:
A tuple containing:
- node features (if `write_in_file` is False)
- adjacency list (if `write_in_file` is False)
- edge attributes (if `write_in_file` is False)
If `write_in_file` is True, returns (None, None, None).
Raises:
Exception: Any exceptions raised during the conversion process will be printed,
and the function will skip the problematic XMI file if applicable.
"""
global node_features, adj_list, edge_attr
skip_xmi = 0
if write_in_file is True:
resource_set = ResourceSet()
resource = resource_set.get_resource(URI('Basic.ecore'))
mm_root = resource.contents[0]
resource_set.metamodel_registry[mm_root.nsURI] = mm_root
resource = resource_set.get_resource(
URI(f'{output_directory}/xmi_files/{xmi_file}'))
try:
EcoreToMatrixConverter(
resource, write_in_file, f'{output_directory}/csv_files')
except Exception as e:
print(e)
print(f'Problem with xmi file {xmi_file}. Skipping')
skip_xmi += 1
if write_in_file is False:
try:
matrix = EcoreToMatrixConverter(local_ecore_graph, write_in_file)
node_features = matrix.get_node_features()
adj_list = matrix.get_adjacency_list()
edge_attr = matrix.get_encoded_edge_attributes()
except Exception as e:
print(e)
return node_features, adj_list, edge_attr
else:
return None, None, None
def parallel_processing(func, repository_list):
"""
Execute a function in parallel across a list of repositories.
This function utilizes multiprocessing to apply the given function to each
repository in the repository list concurrently.
Args:
func: The function to be executed in parallel. It should accept a
variable number of arguments.
repository_list: A list of tuples, where each tuple contains the arguments
to be passed to the function.
Raises:
Exception: Any exceptions raised during the execution of the function
will be propagated.
"""
with Pool() as pool: # number of processes is return value of os.cpu_count()
results = pool.starmap(func, repository_list)
return results
def prepare_dataset(repository_directory, output_directory=None, repository_list=None):
"""
Prepare a dataset by processing repositories into type graphs and converting
those graphs into matrices suitable for a Graph Convolutional Network (GCN).
This function clones repositories, creates necessary output directories,
and handles parallel processing to convert repositories and their
corresponding type graphs into matrices.
Args:
repository_directory: The path to the directory containing the repositories.
output_directory: The path where output files will be saved.
Required if multiple repositories are processed.
repository_list: A list of repositories to clone. If provided,
these repositories will be downloaded.
Returns:
tuple: A tuple containing:
- node_features: Features of the nodes in the graph.
- adj_list: Adjacency list representation of the graph.
- edge_attr: Attributes of the edges in the graph.
Raises:
Exception: If no repositories are found or if the output directory is missing
when processing multiple repositories.
"""
global repo_multiprocess, ecore_graph
global node_features, adj_list, edge_attribute
# clone repositories for the dataset
if repository_list is not None:
download_repositories(repository_directory, repository_list)
repositories = os.listdir(repository_directory)
if len(repositories) == 1:
write_in_file = False
elif len(repositories) > 1:
write_in_file = True
else:
raise Exception("No repositories found")
# create output directory
if write_in_file is True:
try:
create_output_folders(output_directory)
except Exception as e:
print(e)
# exit program because of missing output directory
exit('output directory is required!')
# create pool for multiprocessing/parallelization
repo_multiprocess = []
for repository in repositories:
current_directory = os.path.join(repository_directory, repository)
repo_multiprocess.append(
(current_directory, write_in_file, output_directory))
print('---convert repositories into type graphs---')
# convert repositories into type graphs
if write_in_file is True:
parallel_processing(create_ecore_graphs, repo_multiprocess)
else:
single_directory = os.path.join(repository_directory, repositories[0])
ecore_graph = create_ecore_graphs(single_directory, write_in_file)
print('---convert type graphs into three matrices---')
# load xmi instance and convert them to a matrix structure for the gcn
if write_in_file is True:
list_xmi_files = os.listdir(f'{output_directory}/xmi_files')
xmi_multiprocess = []
for xmi_file in list_xmi_files:
xmi_multiprocess.append(
(write_in_file, xmi_file, None, output_directory))
parallel_processing(create_matrix_structure, xmi_multiprocess)
else:
node_features, adj_list, edge_attribute = create_matrix_structure(
write_in_file, None, ecore_graph)
# if only one repository is converted for classification, adjust data format needed by the gcn
if node_features is not None and adj_list is not None and edge_attribute is not None:
node_features = convert_hashed_names_to_float(node_features)
adj_list = convert_list_to_long_tensor(adj_list)
adj_list = convert_edge_dim(adj_list)
edge_attribute = convert_list_to_float_tensor(edge_attribute)
return node_features, adj_list, edge_attribute