Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink-filesystems/flink-s3-fs-native/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Key classes:
- `NativeS3FileSystem` - Main FileSystem implementation
- `NativeS3RecoverableWriter` - Exactly-once writer using multipart uploads
- `S3ClientProvider` - Manages S3 client lifecycle
- `NativeS3AccessHelper` - Low-level S3 operations
- `NativeS3ObjectOperations` - Low-level S3 operations (multipart upload, put, get, delete)

## Building

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper;
import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations;
import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -105,7 +105,7 @@ class NativeS3FileSystem extends FileSystem
@Nullable private final String entropyInjectionKey;
private final int entropyLength;

@Nullable private final NativeS3AccessHelper s3AccessHelper;
@Nullable private final NativeS3ObjectOperations s3AccessHelper;
private final long s3uploadPartSize;
private final int maxConcurrentUploadsPerStream;
private final String localTmpDir;
Expand Down Expand Up @@ -137,7 +137,7 @@ public NativeS3FileSystem(
this.useAsyncOperations = useAsyncOperations;
this.readBufferSize = readBufferSize;
this.s3AccessHelper =
new NativeS3AccessHelper(
new NativeS3ObjectOperations(
clientProvider.getS3Client(),
clientProvider.getTransferManager(),
bucketName,
Expand Down Expand Up @@ -176,7 +176,7 @@ public Path getHomeDirectory() {
@Override
public FileStatus getFileStatus(Path path) throws IOException {
checkNotClosed();
final String key = NativeS3AccessHelper.extractKey(path);
final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();

LOG.debug("Getting file status for s3://{}/{}", bucketName, key);
Expand Down Expand Up @@ -275,7 +275,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
checkNotClosed();
final String key = NativeS3AccessHelper.extractKey(path);
final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();
final long fileSize = getFileStatus(path).getLen();
return new NativeS3InputStream(s3Client, bucketName, key, fileSize, bufferSize);
Expand All @@ -284,7 +284,7 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
@Override
public FSDataInputStream open(Path path) throws IOException {
checkNotClosed();
final String key = NativeS3AccessHelper.extractKey(path);
final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();
final long fileSize = getFileStatus(path).getLen();
return new NativeS3InputStream(s3Client, bucketName, key, fileSize, readBufferSize);
Expand All @@ -308,7 +308,7 @@ public FSDataInputStream open(Path path) throws IOException {
@Override
public FileStatus[] listStatus(Path path) throws IOException {
checkNotClosed();
String key = NativeS3AccessHelper.extractKey(path);
String key = NativeS3ObjectOperations.extractKey(path);
if (!key.isEmpty() && !key.endsWith("/")) {
key = key + "/";
}
Expand Down Expand Up @@ -355,7 +355,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
checkNotClosed();
final String key = NativeS3AccessHelper.extractKey(path);
final String key = NativeS3ObjectOperations.extractKey(path);
final S3Client s3Client = clientProvider.getS3Client();

try {
Expand Down Expand Up @@ -426,7 +426,7 @@ public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOEx
}
}

final String key = NativeS3AccessHelper.extractKey(path);
final String key = NativeS3ObjectOperations.extractKey(path);
return new NativeS3OutputStream(
clientProvider.getS3Client(),
bucketName,
Expand All @@ -445,8 +445,8 @@ public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOEx
@Override
public boolean rename(Path src, Path dst) throws IOException {
checkNotClosed();
final String srcKey = NativeS3AccessHelper.extractKey(src);
final String dstKey = NativeS3AccessHelper.extractKey(dst);
final String srcKey = NativeS3ObjectOperations.extractKey(src);
final String dstKey = NativeS3ObjectOperations.extractKey(dst);
final S3Client s3Client = clientProvider.getS3Client();

final FileStatus srcStatus = getFileStatus(src);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
*/
class NativeS3Committer implements RecoverableFsDataOutputStream.Committer {

private final NativeS3AccessHelper s3AccessHelper;
private final NativeS3ObjectOperations s3AccessHelper;
private final NativeS3Recoverable recoverable;

public NativeS3Committer(NativeS3AccessHelper s3AccessHelper, NativeS3Recoverable recoverable) {
public NativeS3Committer(
NativeS3ObjectOperations s3AccessHelper, NativeS3Recoverable recoverable) {
this.s3AccessHelper = s3AccessHelper;
this.recoverable = recoverable;
}
Expand Down Expand Up @@ -75,7 +76,7 @@ public void commit() throws IOException {
recoverable.parts().stream()
.map(
part ->
new NativeS3AccessHelper.UploadPartResult(
new NativeS3ObjectOperations.UploadPartResult(
part.getPartNumber(), part.getETag()))
.collect(Collectors.toList()),
recoverable.numBytesInParts());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,29 @@
* https://bucket.s3.amazonaws.com/key}) are not currently supported.
*/
@Internal
public class NativeS3AccessHelper {
public class NativeS3ObjectOperations {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3AccessHelper.class);
private static final Logger LOG = LoggerFactory.getLogger(NativeS3ObjectOperations.class);

private final S3Client s3Client;
private final S3TransferManager transferManager;
private final String bucketName;
private final boolean useAsyncOperations;
private final S3EncryptionConfig encryptionConfig;

public NativeS3AccessHelper(S3Client s3Client, String bucketName) {
public NativeS3ObjectOperations(S3Client s3Client, String bucketName) {
this(s3Client, null, bucketName, false, null);
}

public NativeS3AccessHelper(
public NativeS3ObjectOperations(
S3Client s3Client,
S3TransferManager transferManager,
String bucketName,
boolean useAsyncOperations) {
this(s3Client, transferManager, bucketName, useAsyncOperations, null);
}

public NativeS3AccessHelper(
public NativeS3ObjectOperations(
S3Client s3Client,
S3TransferManager transferManager,
String bucketName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class NativeS3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStrea
private static final int BUFFER_SIZE = 64 * 1024;
private final ReentrantLock lock = new ReentrantLock();

private final NativeS3AccessHelper s3AccessHelper;
private final NativeS3ObjectOperations s3AccessHelper;
private final String key;
private final String uploadId;
private final String localTmpDir;
Expand All @@ -77,7 +77,7 @@ class NativeS3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStrea
private volatile boolean closed;

public NativeS3RecoverableFsDataOutputStream(
NativeS3AccessHelper s3AccessHelper,
NativeS3ObjectOperations s3AccessHelper,
String key,
String uploadId,
String localTmpDir,
Expand All @@ -87,7 +87,7 @@ public NativeS3RecoverableFsDataOutputStream(
}

public NativeS3RecoverableFsDataOutputStream(
NativeS3AccessHelper s3AccessHelper,
NativeS3ObjectOperations s3AccessHelper,
String key,
String uploadId,
String localTmpDir,
Expand Down Expand Up @@ -182,7 +182,7 @@ private void uploadCurrentPart() throws IOException {
currentOutputStream.close();

int partNumber = nextPartNumber++;
NativeS3AccessHelper.UploadPartResult result =
NativeS3ObjectOperations.UploadPartResult result =
s3AccessHelper.uploadPart(
key, uploadId, partNumber, currentTempFile, currentPartSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public class NativeS3RecoverableWriter implements RecoverableWriter, AutoCloseab

private static final Logger LOG = LoggerFactory.getLogger(NativeS3RecoverableWriter.class);

private final NativeS3AccessHelper s3AccessHelper;
private final NativeS3ObjectOperations s3AccessHelper;
private final String localTmpDir;
private final long userDefinedMinPartSize;
private final int maxConcurrentUploadsPerStream;
private final AtomicBoolean closed = new AtomicBoolean(false);

private NativeS3RecoverableWriter(
NativeS3AccessHelper s3AccessHelper,
NativeS3ObjectOperations s3AccessHelper,
String localTmpDir,
long userDefinedMinPartSize,
int maxConcurrentUploadsPerStream) {
Expand All @@ -63,7 +63,7 @@ private NativeS3RecoverableWriter(
@Override
public RecoverableFsDataOutputStream open(Path path) throws IOException {
checkNotClosed();
String key = NativeS3AccessHelper.extractKey(path);
String key = NativeS3ObjectOperations.extractKey(path);
LOG.debug("Opening recoverable stream for key: {}", key);

String uploadId = s3AccessHelper.startMultiPartUpload(key);
Expand Down Expand Up @@ -159,7 +159,7 @@ private void checkNotClosed() {
}

public static NativeS3RecoverableWriter writer(
NativeS3AccessHelper s3AccessHelper,
NativeS3ObjectOperations s3AccessHelper,
String localTmpDir,
long userDefinedMinPartSize,
int maxConcurrentUploadsPerStream) {
Expand Down