Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.s3.endpoint;

import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
import static org.apache.hadoop.ozone.s3.util.S3Consts.QueryParams;

import java.io.IOException;
import java.time.Instant;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
import org.apache.hadoop.ozone.audit.S3GAction;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.util.S3StorageType;

/**
* Handles MPU (Multipart Upload) non-POST operations for object key endpoint.
*/
class MultipartKeyHandler extends ObjectOperationHandler {

@Override
Response handleGetRequest(ObjectEndpoint.ObjectRequestContext context, String keyPath)
throws IOException, OS3Exception {

final String uploadId = queryParams().get(QueryParams.UPLOAD_ID);
if (uploadId == null) {
// not MPU -> let next handler run
return null;
}

context.setAction(S3GAction.LIST_PARTS);

final int maxParts = queryParams().getInt(QueryParams.MAX_PARTS, 1000);
final String partNumberMarker = queryParams().get(QueryParams.PART_NUMBER_MARKER);
final AuditLogger.PerformanceStringBuilder perf = context.getPerf();

try {
int partMarker = parsePartNumberMarker(partNumberMarker);
Response response = listParts(context.getBucket(), keyPath, uploadId,
partMarker, maxParts, perf);
long opLatencyNs = getMetrics().updateListPartsSuccessStats(context.getStartNanos());
perf.appendOpLatencyNanos(opLatencyNs);
return response;

} catch (IOException | RuntimeException ex) {
getMetrics().updateListPartsFailureStats(context.getStartNanos());
throw ex;
}
}

@Override
Response handleDeleteRequest(ObjectEndpoint.ObjectRequestContext context, String keyPath)
throws IOException, OS3Exception {

final String uploadId = queryParams().get(QueryParams.UPLOAD_ID);
if (StringUtils.isEmpty(uploadId)) {
// not MPU -> let next handler run
return null;
}

context.setAction(S3GAction.ABORT_MULTIPART_UPLOAD);

try {
Response r = abortMultipartUpload(context.getVolume(),
context.getBucketName(), keyPath, uploadId);

getMetrics().updateAbortMultipartUploadSuccessStats(context.getStartNanos());
return r;

} catch (IOException | RuntimeException ex) {
getMetrics().updateAbortMultipartUploadFailureStats(context.getStartNanos());
throw ex;
}
}

private Response abortMultipartUpload(OzoneVolume volume, String bucket,
String key, String uploadId) throws IOException, OS3Exception {
try {
getClientProtocol().abortMultipartUpload(volume.getName(), bucket, key, uploadId);
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw newError(S3ErrorTable.NO_SUCH_UPLOAD, uploadId, ex);
} else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucket, ex);
}
throw ex;
}
return Response.status(Status.NO_CONTENT).build();
}

/**
* Returns response for the listParts request.
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* @param ozoneBucket
* @param key
* @param uploadId
* @param partNumberMarker
* @param maxParts
* @return
* @throws IOException
* @throws OS3Exception
*/
private Response listParts(OzoneBucket ozoneBucket, String key, String uploadId,
int partNumberMarker, int maxParts,
PerformanceStringBuilder perf)
throws IOException, OS3Exception {

ListPartsResponse resp = new ListPartsResponse();
String bucketName = ozoneBucket.getName();

try {
OzoneMultipartUploadPartListParts parts =
ozoneBucket.listParts(key, uploadId, partNumberMarker, maxParts);

resp.setBucket(bucketName);
resp.setKey(key);
resp.setUploadID(uploadId);
resp.setMaxParts(maxParts);
resp.setPartNumberMarker(partNumberMarker);
resp.setTruncated(false);

resp.setStorageClass(S3StorageType.fromReplicationConfig(
parts.getReplicationConfig()).toString());

if (parts.isTruncated()) {
resp.setTruncated(true);
resp.setNextPartNumberMarker(parts.getNextPartNumberMarker());
}

parts.getPartInfoList().forEach(p -> {
ListPartsResponse.Part part = new ListPartsResponse.Part();
part.setPartNumber(p.getPartNumber());
part.setETag(StringUtils.isNotEmpty(p.getETag()) ? p.getETag() : p.getPartName());
part.setSize(p.getSize());
part.setLastModified(Instant.ofEpochMilli(p.getModificationTime()));
resp.addPart(part);
});

} catch (OMException ex) {
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw newError(NO_SUCH_UPLOAD, uploadId, ex);
} else if (isAccessDenied(ex)) {
throw newError(S3ErrorTable.ACCESS_DENIED,
bucketName + "/" + key + "/" + uploadId, ex);
}
throw ex;
}

perf.appendCount(resp.getPartList().size());
return Response.status(Status.OK).entity(resp).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
Expand Down Expand Up @@ -151,6 +150,7 @@ protected void init() {
ObjectOperationHandler chain = ObjectOperationHandlerChain.newBuilder(this)
.add(new ObjectAclHandler())
.add(new ObjectTaggingHandler())
.add(new MultipartKeyHandler())
.add(this)
.build();
handler = new AuditingObjectOperationHandler(chain);
Expand Down Expand Up @@ -390,28 +390,13 @@ public Response get(
Response handleGetRequest(ObjectRequestContext context, String keyPath)
throws IOException, OS3Exception {

final int maxParts = queryParams().getInt(QueryParams.MAX_PARTS, 1000);
final int partNumber = queryParams().getInt(QueryParams.PART_NUMBER, 0);
final String partNumberMarker = queryParams().get(QueryParams.PART_NUMBER_MARKER);
final String uploadId = queryParams().get(QueryParams.UPLOAD_ID);

final long startNanos = context.getStartNanos();
final PerformanceStringBuilder perf = context.getPerf();

try {
final String bucketName = context.getBucketName();
final OzoneBucket bucket = context.getBucket();

if (uploadId != null) {
// list parts
context.setAction(S3GAction.LIST_PARTS);

int partMarker = parsePartNumberMarker(partNumberMarker);
Response response = listParts(bucket, keyPath, uploadId,
partMarker, maxParts, perf);

return response;
}

context.setAction(S3GAction.GET_KEY);

Expand Down Expand Up @@ -514,9 +499,7 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath)
return responseBuilder.build();

} catch (IOException | RuntimeException ex) {
if (uploadId == null) {
getMetrics().updateGetKeyFailureStats(startNanos);
}
getMetrics().updateGetKeyFailureStats(startNanos);
throw ex;
}
}
Expand Down Expand Up @@ -628,37 +611,6 @@ Example of such app is Trino (through Hive connector).
}
}

/**
* Abort multipart upload request.
* @param bucket
* @param key
* @param uploadId
* @return Response
* @throws IOException
* @throws OS3Exception
*/
private Response abortMultipartUpload(OzoneVolume volume, String bucket,
String key, String uploadId)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
try {
getClientProtocol().abortMultipartUpload(volume.getName(), bucket,
key, uploadId);
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw newError(S3ErrorTable.NO_SUCH_UPLOAD, uploadId, ex);
} else if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) {
throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucket, ex);
}
throw ex;
}
getMetrics().updateAbortMultipartUploadSuccessStats(startNanos);
return Response
.status(Status.NO_CONTENT)
.build();
}


/**
* Delete a specific object from a bucket, if query param uploadId is
* specified, this request is for abort multipart upload.
Expand Down Expand Up @@ -704,30 +656,18 @@ public Response delete(
Response handleDeleteRequest(ObjectRequestContext context, String keyPath)
throws IOException, OS3Exception {

final String bucketName = context.getBucketName();
final long startNanos = context.startNanos;
final String uploadId = queryParams().get(QueryParams.UPLOAD_ID);

try {
OzoneVolume volume = context.getVolume();

if (uploadId != null && !uploadId.isEmpty()) {
context.setAction(S3GAction.ABORT_MULTIPART_UPLOAD);

return abortMultipartUpload(volume, bucketName, keyPath, uploadId);
}

getClientProtocol().deleteKey(volume.getName(), context.getBucketName(), keyPath, false);

getMetrics().updateDeleteKeySuccessStats(startNanos);
return Response.status(Status.NO_CONTENT).build();

} catch (Exception ex) {
if (uploadId != null && !uploadId.isEmpty()) {
getMetrics().updateAbortMultipartUploadFailureStats(startNanos);
} else {
getMetrics().updateDeleteKeyFailureStats(startNanos);
}
getMetrics().updateDeleteKeyFailureStats(startNanos);
throw ex;
}
}
Expand Down Expand Up @@ -1038,75 +978,6 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket,
}
}

/**
* Returns response for the listParts request.
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* @param ozoneBucket
* @param key
* @param uploadID
* @param partNumberMarker
* @param maxParts
* @return
* @throws IOException
* @throws OS3Exception
*/
private Response listParts(OzoneBucket ozoneBucket, String key, String uploadID,
int partNumberMarker, int maxParts, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
ListPartsResponse listPartsResponse = new ListPartsResponse();
String bucketName = ozoneBucket.getName();
try {
OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
ozoneBucket.listParts(key, uploadID, partNumberMarker, maxParts);
listPartsResponse.setBucket(bucketName);
listPartsResponse.setKey(key);
listPartsResponse.setUploadID(uploadID);
listPartsResponse.setMaxParts(maxParts);
listPartsResponse.setPartNumberMarker(partNumberMarker);
listPartsResponse.setTruncated(false);

listPartsResponse.setStorageClass(S3StorageType.fromReplicationConfig(
ozoneMultipartUploadPartListParts.getReplicationConfig()).toString());

if (ozoneMultipartUploadPartListParts.isTruncated()) {
listPartsResponse.setTruncated(
ozoneMultipartUploadPartListParts.isTruncated());
listPartsResponse.setNextPartNumberMarker(
ozoneMultipartUploadPartListParts.getNextPartNumberMarker());
}

ozoneMultipartUploadPartListParts.getPartInfoList().forEach(partInfo -> {
ListPartsResponse.Part part = new ListPartsResponse.Part();
part.setPartNumber(partInfo.getPartNumber());
// If the ETag field does not exist, use MPU part name for backward
// compatibility
part.setETag(StringUtils.isNotEmpty(partInfo.getETag()) ?
partInfo.getETag() : partInfo.getPartName());
part.setSize(partInfo.getSize());
part.setLastModified(Instant.ofEpochMilli(
partInfo.getModificationTime()));
listPartsResponse.addPart(part);
});
} catch (OMException ex) {
getMetrics().updateListPartsFailureStats(startNanos);
if (ex.getResult() == ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
throw newError(NO_SUCH_UPLOAD, uploadID, ex);
} else if (isAccessDenied(ex)) {
throw newError(S3ErrorTable.ACCESS_DENIED,
bucketName + "/" + key + "/" + uploadID, ex);
}
throw ex;
} catch (IOException | RuntimeException ex) {
getMetrics().updateListPartsFailureStats(startNanos);
throw ex;
}
long opLatencyNs = getMetrics().updateListPartsSuccessStats(startNanos);
perf.appendCount(listPartsResponse.getPartList().size());
perf.appendOpLatencyNanos(opLatencyNs);
return Response.status(Status.OK).entity(listPartsResponse).build();
}

@SuppressWarnings("checkstyle:ParameterNumber")
void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
String destKey, String destBucket,
Expand Down