Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/testing-work.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
pip install pytest-cov
pip install -e .
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f test_requirements.txt ]; then pip install -r test_requirements.txt; fi

- name: Test with pytest
run: |
Expand Down
235 changes: 231 additions & 4 deletions mdf_connect_client/mdfcc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from datetime import datetime
from typing import Any, Tuple, Dict, List
from uuid import uuid4
import json
import os

import globus_sdk
import mdf_toolbox
from nameparser import HumanName
import requests
import urllib

from .version import __version__

Expand All @@ -23,6 +27,11 @@
"reject": ("This submission has been rejected because it does not meet the "
"appropriate standards")
}
FILE_UPLOAD_SERVICES = ["transfer", "openid",
"https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all", # funcx
"https://auth.globus.org/scopes/f10a69a9-338c-4e5b-baa1-0dc92359ab47/https", # Eagle HTTPS
"https://auth.globus.org/scopes/82f1b5c6-6e9b-11e5-ba47-22000b92c6ec/https", # NCSA HTTPS
]


class MDFConnectClient:
Expand All @@ -37,7 +46,7 @@ class MDFConnectClient:
globus_sdk.NullAuthorizer
]

def __init__(self, test=False, service_instance=None, authorizer=None):
def __init__(self, test=False, service_instance=None, authorizer=None, confidential=False, client_secret=None):
"""Create an MDF Connect Client.

Arguments:
Expand All @@ -53,6 +62,12 @@ def __init__(self, test=False, service_instance=None, authorizer=None):
authorizer (globus_sdk.GlobusAuthorizer): The authorizer to use for authentication.
This value should not normally be changed from the default.
**Default:** ``None``, to run the standard authentication flow.
confidential (bool): When ``True``, log in to Globus services as a confidential client
(a client with its own login information, i.e. NOT a human's account).
**Default:** ``False``, to run the standard authentication flow.
client_secret (str): Client secret to use when performing a confidential login.
Required when performing a confidential login.
**Default:** ``None``, because it is unnecessary otherwise.

Returns:
*MDFConnectClient*: An initialized, authenticated MDF Connect Client.
Expand Down Expand Up @@ -82,9 +97,18 @@ def __init__(self, test=False, service_instance=None, authorizer=None):
if any([isinstance(authorizer, allowed) for allowed in self.__allowed_authorizers]):
self.__authorizer = authorizer
else:
self.__authorizer = mdf_toolbox.login(services=self.__login_services,
client_id=self.__client_id,
app_name=self.__app_name).get(login_service)
perform_login = mdf_toolbox.login
login_kwargs = {"services": self.__login_services+FILE_UPLOAD_SERVICES,
"client_id": self.__client_id,
"app_name": self.__app_name}
if confidential:
if client_secret is None:
raise ValueError(f"Unable to perform confidential login without client_secret")
perform_login = mdf_toolbox.confidential_login
login_kwargs["client_secret"] = client_secret
del login_kwargs["app_name"]
self.__auths = perform_login(**login_kwargs)
self.__authorizer = self.__auths.get(login_service)
if not self.__authorizer:
raise ValueError("Unable to authenticate")

Expand All @@ -94,6 +118,7 @@ def logout(self):
"""
self.reset_submission()
self.__authorizer = None
if self.__auths: self.__auths = None
mdf_toolbox.logout(client_id=self.__client_id, app_name=self.__app_name)
return "Logged out. You must create a new MDF Connect Client to log back in."

Expand Down Expand Up @@ -1464,3 +1489,205 @@ def reject_curation_submission(self, source_id, reason=None, prompt=True, raw=Fa
if raw is ``True``, *dict*: The full task results.
"""
return self._complete_curation_task(source_id, "reject", reason, prompt, raw)

def upload_to_endpoint(self, local_data_path: str, endpoint_id: str = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec",
dest_parent: str = None, dest_child: str = None) -> Tuple[str, str]:
"""Upload local data to a Globus endpoint using HTTPS PUT requests. Data can be a folder or an individual file.
Note that the ACL rule created in this method must later be deleted after the dataset is submitted to MDF.
Args:
local_data_path (str): Path to the local dataset to publish to Foundry via HTTPS. Creates an HTTPS PUT
request to upload the data specified to a Globus endpoint (default is NCSA endpoint) before it is
transferred to MDF.
endpoint_id (str): Globus endpoint ID to upload the data to. Default is NCSA endpoint.

Returns
-------
(str) Globus data source URL: URL pointing to the data on the Globus endpoint
(str) rule_id: Globus ACL rule ID for the uploaded data. Used to delete the rule after the dataset is submitted
to MDF.
"""
# define upload destination
dest_path = self._create_dest_folder(endpoint_id, parent_dir=dest_parent, child_dir=dest_child)
# create new ACL rule (ie permission) for user to read/write to endpoint and path
rule_id = "" # self._create_access_rule(endpoint_id, dest_path)
# upload data to endpoint
globus_data_source = self._https_upload(local_data_path=local_data_path, dest_path=dest_path,
endpoint_id=endpoint_id)
return globus_data_source, rule_id

def _create_dest_folder(self, endpoint_id: str, parent_dir: str = None, child_dir: str = None) -> str:
"""Create a destination folder for the data on a Globus endpoint
Args:
endpoint_id (str): A UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or
the SDK.
parent_dir (str): Set to "/tmp" when default is None. The parent directory that all publications via HTTPS
will be written to.
child_dir (str): Set to a random UUID when default is None. The child directory that the data will be
written to.
Returns
-------
(str): Path on Globus endpoint to write to
"""
transfer_client = self.__auths["transfer"]
# use a random UUID for each dataset publication, unless specified otherwise
if child_dir is None:
child_dir = uuid4() # the publication ID forms the name of the child directory
if parent_dir is None:
parent_dir = "/tmp"
dest_path = os.path.join(parent_dir, str(child_dir)) # NOTE: must start and end with "/"

try:
transfer_client.operation_mkdir(endpoint_id=endpoint_id, path=dest_path)
except globus_sdk.TransferAPIError as e:
raise IOError(f"Error from Globus API while creating destination folder: {e.message}") from e
return dest_path

def _create_access_rule(self, endpoint_id: str, dest_path: str) -> str:
"""Create an ACL rule (ie permission) for the user to read/write to the given destination on a Globus endpoint
Args:
endpoint_id (str): A UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or
the SDK.
dest_path (str): The path to the existing folder on the given Globus endpoint.
Returns
-------
(str): The ID for the ACL rule (necessary to delete it in the future)
"""
transfer_client = self.__auths["transfer"]
auth_client = globus_sdk.AuthClient(authorizer=self.__auths["openid"])
# get user info
res = auth_client.oauth2_userinfo()
user_id = res.data["sub"] # get the user primary ID (based on primary email set in Globus)
# create data blob needed to set new rule with Globus
rule_data = {
"DATA_TYPE": "access",
"principal_type": "identity",
"principal": user_id,
"path": dest_path,
"permissions": "rw",
}
# create new ACL rule (eg permission) for user to read/write to endpoint and path
rule_id = None
try:
ret = transfer_client.add_endpoint_acl_rule(endpoint_id, rule_data)
rule_id = ret["access_id"] # rule_id is needed to delete the rule later
except globus_sdk.TransferAPIError:
pass # NOTE: known issue where user can still write to endpoint if this fails
return rule_id

def _https_upload(self, local_data_path: str, dest_path: str = "/tmp",
endpoint_id: str = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec") -> str:
"""Upload a dataset via HTTPS to a Globus endpoint
Args:
local_data_path (str): The path to the local data to upload. Can be relative or absolute.
dest_path (str): The path to the destination folder on the Globus endpoint. Default is "/tmp".
endpoint_id (str): A UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or
the SDK. Default is the NCSA UUID "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec".
Returns
-------
(str): Globus data source URL (ie the URL that points to the data on a Globus endpoint)
"""
transfer_client = self.__auths["transfer"]
# get URL for Globus endpoint location
endpoint = transfer_client.get_endpoint(endpoint_id) # gets info for NCSA endpoint
https_base_url = endpoint["https_server"]

# Submit data (folders of files or an independent file) to be written to endpoint
if os.path.isdir(local_data_path):
self._upload_folder(local_data_path, https_base_url, dest_path, endpoint_id)
elif os.path.isfile(local_data_path):
self._upload_file(local_data_path, https_base_url, dest_path, endpoint_id)
else:
raise IOError(f"Data path '{local_data_path}' is of unknown type")

# return the data source URL for publication to MDF
return self._make_globus_link(endpoint_id, dest_path)

def _upload_folder(self, local_data_path: str, https_base_url: str, parent_dest_path: str, endpoint_id: str) \
-> List[Dict[str, Any]]:
"""Upload a folder to a Globus endpoint using HTTPS
Args:
local_data_path (str): The path to the local data to upload. Can be relative or absolute.
https_base_url (str): The URL for a given Globus endpoint.
parent_dest_path (str): The path to the parent folder to be written to on the given endpoint. The contents
of "local_data_path" will be written here, including subdirectories.
endpoint_id (str): The UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or
the SDK. This must be the same endpoint pointed to by the https_base_url.
Returns
-------
(list): A list of all the HTTPS PUT request results (dicts) from the uploads
"""
transfer_client = self.__auths["transfer"]
results = []
# initialize destination path as the parent destination path
dest_path = parent_dest_path

# walk through each child directory in the designated local data folder
for root, _, files in os.walk(local_data_path):
# update destination path if we have walked into a child directory
if root != local_data_path:
# get the child directory relative path
subpath = os.path.relpath(root, local_data_path)
# update destination path to include child directories (ie subpaths)
dest_path = os.path.join(parent_dest_path, subpath)
# create child directories on endpoint
try:
transfer_client.operation_mkdir(endpoint_id=endpoint_id, path=dest_path)
except globus_sdk.TransferAPIError as e:
raise IOError(f"Error while creating child directory {dest_path}: {e.message}") from e
# get local path to file to upload
for filename in files:
filepath = os.path.join(root, filename)
# upload file to destination path on endpoint
result = self._upload_file(filepath, https_base_url, dest_path, endpoint_id)
results.append(result)
return results

def _upload_file(self, filepath: str, https_base_url: str, dest_path: str, endpoint_id: str) -> requests.Response:
"""Upload an individual file to a Globus endpoint using HTTPS PUT
Args:
filepath (str): The path to the local file to upload.
https_base_url (str): The URL for a given Globus endpoint.
dest_path (str): The path to the folder to be written to on the given endpoint.
endpoint_id (str): The UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or
the SDK. This must be the same endpoint pointed to by the https_base_url.
Returns
-------
(Response): The `requests` HTTPS response object from a PUT request
"""
# lets you HTTPS to specific endpoint (NCSA endpoint by default)
scope = f"https://auth.globus.org/scopes/{endpoint_id}/https"
# Get the authorization header token (string for the headers dict) HTTPS upload
auth_gcs = globus_sdk.AuthClient(authorizer=self.__auths[scope])
header = auth_gcs.authorizer.get_authorization_header()

# get Globus endpoint path to write to
filename = os.path.split(filepath)[1]
# need to strip out leading "/" in dest_path for join to work
endpoint_dest = os.path.join(https_base_url, dest_path.lstrip("/"), filename)

# upload via HTTPS as arbitrary binary content type
with open(filepath, "rb") as f:
reply = requests.put(
endpoint_dest,
data=f,
headers={"Authorization": header, "Content-Type": "application/octet-stream"}
)
if reply.status_code != 200:
raise IOError(f"Error on HTTPS PUT, got response {reply.status_code}: {reply.text}")
# Return the response
return reply

def _make_globus_link(self, endpoint_id: str, path: str) -> str:
"""Create the Globus data source URL for a given datapath on an endpoint
Args:
endpoint_id (str): The UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or
the SDK.
path (str): The path to the dataset folder on the given endpoint.
Returns
-------
(str): The Globus data source URL (ie the URL that points to the data on a Globus endpoint)
"""
# make sure the path has the "/"s encoded properly for a URL
safe_path = urllib.parse.quote(path, safe="*")
link = f"https://app.globus.org/file-manager?origin_id={endpoint_id}&origin_path={safe_path}"
return link
5 changes: 4 additions & 1 deletion test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ coveralls>=1.10.0
nbval>=0.9.4
pytest>=5.3.1
pytest-cov>=2.5.1
mdf_toolbox
mdf_toolbox
numpy
pandas
requests
62 changes: 59 additions & 3 deletions tests/test_connect_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from datetime import datetime
from filecmp import cmp
from math import floor
import json
import os
import numpy as np
import pandas as pd
import requests

from mdf_toolbox import insensitive_comparison
import mdf_toolbox
Expand All @@ -13,9 +19,9 @@
client_secret = os.getenv('CLIENT_SECRET')

auths = mdf_toolbox.confidential_login(client_id=client_id,
client_secret=client_secret,
services=["mdf_connect", "mdf_connect_dev"],
make_clients=True)
client_secret=client_secret,
services=["mdf_connect", "mdf_connect_dev"],
make_clients=True)

print(auths)

Expand Down Expand Up @@ -469,6 +475,56 @@ def test_submission():
"update": False})


def test_https_upload():
"""Unit test: Test the _upload_to_endpoint() HTTPS functionality on its own, without publishing to MDF
"""
endpoint_id = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec" # NCSA endpoint
dest_parent = "/tmp"
dest_child = f"test_{floor(datetime.now().timestamp())}"
local_path = "./data/https_test"
filename = "test_data.json"

mdf = MDFConnectClient(confidential=True, client_secret=client_secret)
# create test JSON to upload (if it doesn't already exist)
_write_test_data(local_path, filename)
# upload via HTTPS to NCSA endpoint
globus_data_source, _ = mdf.upload_to_endpoint(local_path, endpoint_id, dest_parent=dest_parent,
dest_child=dest_child)

expected_data_source = f"https://app.globus.org/file-manager?origin_id=" \
f"82f1b5c6-6e9b-11e5-ba47-22000b92c6ec&origin_path=%2Ftmp%2F{dest_child}"
# confirm data source link was created properly, with correct folders
assert globus_data_source == expected_data_source

mdf_url = f"https://data.materialsdatafacility.org/tmp/{dest_child}/{filename}"
response = requests.get(mdf_url)
# check that we get a valid response back (note that a 200 could be a UI error, returned as HTML)
assert response.status_code == 200
# check that contents of response are as expected
tmp_file = "./data/tmp_data.json"
with open(tmp_file, "wb") as fl:
fl.write(response.content)
assert cmp(tmp_file, os.path.join(local_path, filename))

# delete ACL rule for user
# if rule_id is not None:
# res = f.transfer_client.delete_endpoint_acl_rule(endpoint_id, rule_id)


def _write_test_data(dest_path="./data/https_test", filename="test_data.json"):
# Create random JSON data
data = pd.DataFrame(np.random.rand(100, 4), columns=list('ABCD'))
res = data.to_json(orient="records")

# Make data directory
os.makedirs(dest_path, exist_ok=True)
data_filepath = os.path.join(dest_path, filename)

# Write data to JSON file
with open(data_filepath, "w+") as f:
json.dump(res, f, indent=4)


# def test_submit_dataset():
# # TODO
# pass
Expand Down