From ea0ac4cedfc711349f8d531d78e06dd1709e9b8e Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Wed, 8 Apr 2026 23:04:35 +0530 Subject: [PATCH] [FLINK-39111][filesystem] Alter NativeS3AccessHelper Interface to NativeS3ObjectOperations --- .../flink-s3-fs-native/README.md | 2 +- .../flink/fs/s3native/NativeS3FileSystem.java | 22 +++++++++---------- .../fs/s3native/writer/NativeS3Committer.java | 7 +++--- ...per.java => NativeS3ObjectOperations.java} | 10 ++++----- ...NativeS3RecoverableFsDataOutputStream.java | 8 +++---- .../writer/NativeS3RecoverableWriter.java | 8 +++---- 6 files changed, 29 insertions(+), 28 deletions(-) rename flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/{NativeS3AccessHelper.java => NativeS3ObjectOperations.java} (99%) diff --git a/flink-filesystems/flink-s3-fs-native/README.md b/flink-filesystems/flink-s3-fs-native/README.md index 4e2bb103ebab5..7f6cfc56bedb6 100644 --- a/flink-filesystems/flink-s3-fs-native/README.md +++ b/flink-filesystems/flink-s3-fs-native/README.md @@ -343,7 +343,7 @@ Key classes: - `NativeS3FileSystem` - Main FileSystem implementation - `NativeS3RecoverableWriter` - Exactly-once writer using multipart uploads - `S3ClientProvider` - Manages S3 client lifecycle -- `NativeS3AccessHelper` - Low-level S3 operations +- `NativeS3ObjectOperations` - Low-level S3 operations (multipart upload, put, get, delete) ## Building diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java index 81e5075ba445b..2963fdf2e28d7 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java @@ -27,7 +27,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.PathsCopyingFileSystem; import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper; +import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations; import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.StringUtils; @@ -105,7 +105,7 @@ class NativeS3FileSystem extends FileSystem @Nullable private final String entropyInjectionKey; private final int entropyLength; - @Nullable private final NativeS3AccessHelper s3AccessHelper; + @Nullable private final NativeS3ObjectOperations s3AccessHelper; private final long s3uploadPartSize; private final int maxConcurrentUploadsPerStream; private final String localTmpDir; @@ -137,7 +137,7 @@ public NativeS3FileSystem( this.useAsyncOperations = useAsyncOperations; this.readBufferSize = readBufferSize; this.s3AccessHelper = - new NativeS3AccessHelper( + new NativeS3ObjectOperations( clientProvider.getS3Client(), clientProvider.getTransferManager(), bucketName, @@ -176,7 +176,7 @@ public Path getHomeDirectory() { @Override public FileStatus getFileStatus(Path path) throws IOException { checkNotClosed(); - final String key = NativeS3AccessHelper.extractKey(path); + final String key = NativeS3ObjectOperations.extractKey(path); final S3Client s3Client = clientProvider.getS3Client(); LOG.debug("Getting file status for s3://{}/{}", bucketName, key); @@ -275,7 +275,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { checkNotClosed(); - final String key = NativeS3AccessHelper.extractKey(path); + final String key = NativeS3ObjectOperations.extractKey(path); final S3Client s3Client = clientProvider.getS3Client(); final long fileSize = getFileStatus(path).getLen(); return new NativeS3InputStream(s3Client, bucketName, key, fileSize, bufferSize); @@ -284,7 +284,7 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { @Override public FSDataInputStream open(Path path) throws IOException { checkNotClosed(); - final String key = NativeS3AccessHelper.extractKey(path); + final String key = NativeS3ObjectOperations.extractKey(path); final S3Client s3Client = clientProvider.getS3Client(); final long fileSize = getFileStatus(path).getLen(); return new NativeS3InputStream(s3Client, bucketName, key, fileSize, readBufferSize); @@ -308,7 +308,7 @@ public FSDataInputStream open(Path path) throws IOException { @Override public FileStatus[] listStatus(Path path) throws IOException { checkNotClosed(); - String key = NativeS3AccessHelper.extractKey(path); + String key = NativeS3ObjectOperations.extractKey(path); if (!key.isEmpty() && !key.endsWith("/")) { key = key + "/"; } @@ -355,7 +355,7 @@ public FileStatus[] listStatus(Path path) throws IOException { @Override public boolean delete(Path path, boolean recursive) throws IOException { checkNotClosed(); - final String key = NativeS3AccessHelper.extractKey(path); + final String key = NativeS3ObjectOperations.extractKey(path); final S3Client s3Client = clientProvider.getS3Client(); try { @@ -426,7 +426,7 @@ public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOEx } } - final String key = NativeS3AccessHelper.extractKey(path); + final String key = NativeS3ObjectOperations.extractKey(path); return new NativeS3OutputStream( clientProvider.getS3Client(), bucketName, @@ -445,8 +445,8 @@ public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOEx @Override public boolean rename(Path src, Path dst) throws IOException { checkNotClosed(); - final String srcKey = NativeS3AccessHelper.extractKey(src); - final String dstKey = NativeS3AccessHelper.extractKey(dst); + final String srcKey = NativeS3ObjectOperations.extractKey(src); + final String dstKey = NativeS3ObjectOperations.extractKey(dst); final S3Client s3Client = clientProvider.getS3Client(); final FileStatus srcStatus = getFileStatus(src); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java index 2765740088ad1..7e93b4e766e9f 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java @@ -41,10 +41,11 @@ */ class NativeS3Committer implements RecoverableFsDataOutputStream.Committer { - private final NativeS3AccessHelper s3AccessHelper; + private final NativeS3ObjectOperations s3AccessHelper; private final NativeS3Recoverable recoverable; - public NativeS3Committer(NativeS3AccessHelper s3AccessHelper, NativeS3Recoverable recoverable) { + public NativeS3Committer( + NativeS3ObjectOperations s3AccessHelper, NativeS3Recoverable recoverable) { this.s3AccessHelper = s3AccessHelper; this.recoverable = recoverable; } @@ -75,7 +76,7 @@ public void commit() throws IOException { recoverable.parts().stream() .map( part -> - new NativeS3AccessHelper.UploadPartResult( + new NativeS3ObjectOperations.UploadPartResult( part.getPartNumber(), part.getETag())) .collect(Collectors.toList()), recoverable.numBytesInParts()); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java similarity index 99% rename from flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java rename to flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java index c86ff67e78e4b..01bc660f3ac79 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java @@ -94,9 +94,9 @@ * https://bucket.s3.amazonaws.com/key}) are not currently supported. */ @Internal -public class NativeS3AccessHelper { +public class NativeS3ObjectOperations { - private static final Logger LOG = LoggerFactory.getLogger(NativeS3AccessHelper.class); + private static final Logger LOG = LoggerFactory.getLogger(NativeS3ObjectOperations.class); private final S3Client s3Client; private final S3TransferManager transferManager; @@ -104,11 +104,11 @@ public class NativeS3AccessHelper { private final boolean useAsyncOperations; private final S3EncryptionConfig encryptionConfig; - public NativeS3AccessHelper(S3Client s3Client, String bucketName) { + public NativeS3ObjectOperations(S3Client s3Client, String bucketName) { this(s3Client, null, bucketName, false, null); } - public NativeS3AccessHelper( + public NativeS3ObjectOperations( S3Client s3Client, S3TransferManager transferManager, String bucketName, @@ -116,7 +116,7 @@ public NativeS3AccessHelper( this(s3Client, transferManager, bucketName, useAsyncOperations, null); } - public NativeS3AccessHelper( + public NativeS3ObjectOperations( S3Client s3Client, S3TransferManager transferManager, String bucketName, diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java index c69ff909fe368..56e7729112607 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java @@ -59,7 +59,7 @@ class NativeS3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStrea private static final int BUFFER_SIZE = 64 * 1024; private final ReentrantLock lock = new ReentrantLock(); - private final NativeS3AccessHelper s3AccessHelper; + private final NativeS3ObjectOperations s3AccessHelper; private final String key; private final String uploadId; private final String localTmpDir; @@ -77,7 +77,7 @@ class NativeS3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStrea private volatile boolean closed; public NativeS3RecoverableFsDataOutputStream( - NativeS3AccessHelper s3AccessHelper, + NativeS3ObjectOperations s3AccessHelper, String key, String uploadId, String localTmpDir, @@ -87,7 +87,7 @@ public NativeS3RecoverableFsDataOutputStream( } public NativeS3RecoverableFsDataOutputStream( - NativeS3AccessHelper s3AccessHelper, + NativeS3ObjectOperations s3AccessHelper, String key, String uploadId, String localTmpDir, @@ -182,7 +182,7 @@ private void uploadCurrentPart() throws IOException { currentOutputStream.close(); int partNumber = nextPartNumber++; - NativeS3AccessHelper.UploadPartResult result = + NativeS3ObjectOperations.UploadPartResult result = s3AccessHelper.uploadPart( key, uploadId, partNumber, currentTempFile, currentPartSize); diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java index 5aeedd12601a3..942b1d3e80ba0 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableWriter.java @@ -38,14 +38,14 @@ public class NativeS3RecoverableWriter implements RecoverableWriter, AutoCloseab private static final Logger LOG = LoggerFactory.getLogger(NativeS3RecoverableWriter.class); - private final NativeS3AccessHelper s3AccessHelper; + private final NativeS3ObjectOperations s3AccessHelper; private final String localTmpDir; private final long userDefinedMinPartSize; private final int maxConcurrentUploadsPerStream; private final AtomicBoolean closed = new AtomicBoolean(false); private NativeS3RecoverableWriter( - NativeS3AccessHelper s3AccessHelper, + NativeS3ObjectOperations s3AccessHelper, String localTmpDir, long userDefinedMinPartSize, int maxConcurrentUploadsPerStream) { @@ -63,7 +63,7 @@ private NativeS3RecoverableWriter( @Override public RecoverableFsDataOutputStream open(Path path) throws IOException { checkNotClosed(); - String key = NativeS3AccessHelper.extractKey(path); + String key = NativeS3ObjectOperations.extractKey(path); LOG.debug("Opening recoverable stream for key: {}", key); String uploadId = s3AccessHelper.startMultiPartUpload(key); @@ -159,7 +159,7 @@ private void checkNotClosed() { } public static NativeS3RecoverableWriter writer( - NativeS3AccessHelper s3AccessHelper, + NativeS3ObjectOperations s3AccessHelper, String localTmpDir, long userDefinedMinPartSize, int maxConcurrentUploadsPerStream) {