Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,86 @@
*/
package org.apache.accumulo.server.manager.recovery;

import java.util.ArrayList;
import java.util.List;

import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;

public class RecoveryPath {

// given a wal path, transform it to a recovery path
public static Path getRecoveryPath(Path walPath) {
if (walPath.depth() >= 3 && walPath.toUri().getScheme() != null) {
// its a fully qualified path
String uuid = walPath.getName();
// drop uuid
walPath = walPath.getParent();

// expect and drop the server component
if (walPath.getName().equals(FileType.WAL.getDirectory())) {
throw new IllegalArgumentException("Bath path " + walPath + " (missing server component)");
}
walPath = walPath.getParent();
private static int getPathStart(String walPath) {
final String schemeTest = "://";

if (walPath == null || walPath.isEmpty()) {
throw new IllegalArgumentException("Bad path " + walPath);
}

// Test for scheme
int schemeEnd = walPath.indexOf(schemeTest);
if (schemeEnd < 0) {
throw new IllegalArgumentException("Bad path " + walPath + " No scheme");
}

// expect and drop the wal component
if (!walPath.getName().equals(FileType.WAL.getDirectory())) {
throw new IllegalArgumentException(
"Bad path " + walPath + " (missing wal directory component)");
// Find the start of the path
int pathStart = walPath.indexOf("/", schemeEnd + schemeTest.length());
if (pathStart < 0) {
// Empty path after authority
throw new IllegalArgumentException("Bad path " + walPath + " No content");
}
return pathStart;
}

static String transformToRecoveryPath(String walPath) {
int pathStart = getPathStart(walPath);

// This replaces the need for Path.getParent calls
String[] segments = walPath.substring(pathStart).split("/");
// Remove any spaces
List<String> parts = new ArrayList<>();
for (String s : segments) {
if (!s.isEmpty()) {
parts.add(s);
}
walPath = walPath.getParent();
}

// create new path in recovery directory that is a sibling to the wal directory (same volume),
// without the server component
walPath = new Path(walPath, FileType.RECOVERY.getDirectory());
walPath = new Path(walPath, uuid);
// checks for minimum correct depth
if (parts.size() < 3) {
throw new IllegalArgumentException("Bad path due to length" + walPath);
}

String uuid = parts.get(parts.size() - 1);
String serverComponent = parts.get(parts.size() - 2);
String walDir = parts.get(parts.size() - 3);

if (serverComponent.equals(FileType.WAL.getDirectory())) {
throw new IllegalArgumentException("Bad path " + walPath + " (missing server component)");
}

if (!walDir.equals(FileType.WAL.getDirectory())) {
throw new IllegalArgumentException(
"Bad path " + walPath + " (missing wal directory component)");
}

String authority = walPath.substring(0, pathStart);
// Handle file scheme like Hadoop Path and drop the slashes
// This converts file:///<path> to file:/<path>
if (authority.equals("file://")) {
authority = "file:";
}

return walPath;
List<String> baseParts = parts.subList(0, parts.size() - 3);
StringBuilder recoveryPath = new StringBuilder(authority);
for (String part : baseParts) {
recoveryPath.append("/").append(part);
}
recoveryPath.append("/").append(FileType.RECOVERY.getDirectory());
recoveryPath.append("/").append(uuid);

return recoveryPath.toString();
}

throw new IllegalArgumentException("Bad path " + walPath);
public static String getRecoveryPath(String walPath) {
return transformToRecoveryPath(walPath);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.manager.recovery;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;

public class RecoveryPathTest {

private static final String UUID = "2d961760-db4f-47eb-97fe-d283331ec254";

private static void assertBothValid(String input, String expectedOutput) {
Path hadoopResult = getRecoveryPath(new Path(input));
assertEquals(expectedOutput, hadoopResult.toString(),
"getRecoveryPath mismatch for : " + input);

String stringResult = RecoveryPath.transformToRecoveryPath(input);
assertEquals(expectedOutput, stringResult, "validatePath mismatch for : " + input);
}

private static void assertBothInvalid(String input, String messageContains) {
var error = assertThrows(IllegalArgumentException.class, () -> getRecoveryPath(new Path(input)),
"getRecoveryPath should throw for: " + input);
assertTrue(error.getMessage().contains(messageContains), "getRecoveryPath message '"
+ error.getMessage() + "' does not contain '" + messageContains);
var err = assertThrows(IllegalArgumentException.class,
() -> RecoveryPath.transformToRecoveryPath(input),
"getRecoveryPath should throw for: " + input);
assertTrue(err.getMessage().contains(messageContains),
"validatePath message '" + err.getMessage() + "' does not contain '" + messageContains);
}

@Test
public void testValidHdfsPathConversions() {

assertBothValid("hdfs://nn1/accumulo/wal/localhost+9997/" + UUID,
"hdfs://nn1/accumulo/recovery/" + UUID);

// included port
assertBothValid("hdfs://nn1:9000/accumulo/wal/localhost+9997/" + UUID,
"hdfs://nn1:9000/accumulo/recovery/" + UUID);

// IP
assertBothValid("hdfs://192.168.1.1:9000/accumulo/wal/192.168.1.2+9997/" + UUID,
"hdfs://192.168.1.1:9000/accumulo/recovery/" + UUID);

// alternate server hostname
assertBothValid("hdfs://namenode/accumulo/wal/my-host.example.com+9997/" + UUID,
"hdfs://namenode/accumulo/recovery/" + UUID);

// viewfs vs hdfs
assertBothValid("viewfs://clusterX/accumulo/wal/localhost+9997/" + UUID,
"viewfs://clusterX/accumulo/recovery/" + UUID);

// Deep basePath
assertBothValid("hdfs://nn1/a/b/c/wal/localhost+9997/" + UUID,
"hdfs://nn1/a/b/c/recovery/" + UUID);

}

@Test
public void testValidFilePathConversions() {
assertBothValid("file:///tmp/accumulo/wal/localhost+9997/" + UUID,
"file:/tmp/accumulo/recovery/" + UUID);
}

@Test
public void testInvalidPaths() {
// No scheme and relative
assertBothInvalid("accumulo/wal/localhost+9997/" + UUID, "Bad path");

// No scheme and absolute
assertBothInvalid("/accumulo/wal/localhost+9997/" + UUID, "Bad path");

// Only contains uuid under authority
assertBothInvalid("hdfs://nn1/" + UUID, "Bad path");
// min depth less than 3
assertBothInvalid("hdfs://nn1/wal/" + UUID, "Bad path");

assertBothInvalid("hdfs://nn1/accumulo/wal/" + UUID, "missing server component");

assertBothInvalid("hdfs://nn1/accumulo/recovery/localhost+9997/" + UUID,
"missing wal directory component");

// Invalid WAL dir case
assertBothInvalid("hdfs://nn1/accumulo/WAL/localhost+9997/" + UUID,
"missing wal directory component");
}

@Test
public void testVerifyResultsAreIdentical() {
String[] validPaths = {"hdfs://nn1:9000/accumulo/wal/localhost+9997/" + UUID,
"file:///tmp/acc/wal/host+9997/" + UUID, "hdfs://nn1/a/b/c/wal/s+1/" + UUID,
"viewfs://c/base/wal/h+2/" + UUID,};

for (String path : validPaths) {
String hadoopResult = getRecoveryPath(new Path(path)).toString();
String stringResult = RecoveryPath.transformToRecoveryPath(path);
assertEquals(hadoopResult, stringResult, "Results diverge for path: " + path);
}
}

@Test
public void testRejectionsAreIdentical() {
String[] invalidPaths = {"/accumulo/wal/localhost+9997/" + UUID, // no scheme
"hdfs://nn1/wal/" + UUID, // too shallow
"hdfs://nn1/acc/tables/h+1/" + UUID, // wrong directory
};

for (String path : invalidPaths) {
assertThrows(IllegalArgumentException.class, () -> getRecoveryPath(new Path(path)),
"getRecoveryPath should reject: " + path);
assertThrows(IllegalArgumentException.class, () -> RecoveryPath.transformToRecoveryPath(path),
"validatePath should reject: " + path);
}
}

// given a wal path, transform it to a recovery path
static Path getRecoveryPath(Path walPath) {
if (walPath.depth() >= 3 && walPath.toUri().getScheme() != null) {
// it's a fully qualified path
String uuid = walPath.getName();
// drop uuid
walPath = walPath.getParent();

// expect and drop the server component
if (walPath.getName().equals(FileType.WAL.getDirectory())) {
throw new IllegalArgumentException("Bath path " + walPath + " (missing server component)");
}
walPath = walPath.getParent();

// expect and drop the wal component
if (!walPath.getName().equals(FileType.WAL.getDirectory())) {
throw new IllegalArgumentException(
"Bad path " + walPath + " (missing wal directory component)");
}
walPath = walPath.getParent();

// create new path in recovery directory that is a sibling to the wal directory (same volume),
// without the server component
walPath = new Path(walPath, FileType.RECOVERY.getDirectory());
walPath = new Path(walPath, uuid);

return walPath;
}

throw new IllegalArgumentException("Bad path " + walPath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public void run() {
initiateSort(sortId, source, destination);
}
} catch (FileNotFoundException e) {
log.debug("Unable to initiate log sort for " + source + ": " + e);
log.debug("Unable to initiate log sort for {}: {}", source, e);
} catch (Exception e) {
log.warn("Failed to initiate log sort " + source, e);
log.warn("Failed to initiate log sort {}", source, e);
} finally {
if (!rescheduled) {
synchronized (RecoveryManager.this) {
Expand All @@ -135,7 +135,7 @@ private void initiateSort(String sortId, String source, final String destination
sortsQueued.add(sortId);
}

log.info("Created zookeeper entry {} with data {}", Constants.ZRECOVERY + "/" + sortId, work);
log.info("Created zookeeper entry {}{}{} with data {}", Constants.ZRECOVERY, "/", sortId, work);
}

private boolean exists(final Path path) throws IOException {
Expand Down Expand Up @@ -192,7 +192,7 @@ private boolean recoverLog(LogEntry walog) throws IOException {

String sortId = walog.getUniqueID().toString();
String filename = walog.getPath();
String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString();
String dest = RecoveryPath.getRecoveryPath(filename);

boolean sortQueued;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static ResolvedSortedLog resolve(LogEntry logEntry, VolumeManager fs) thr

// convert the path of on unsorted logs to the expected path for the corresponding sorted log
// dir
Path sortedLogPath = RecoveryPath.getRecoveryPath(new Path(logEntry.getPath()));
Path sortedLogPath = new Path(RecoveryPath.getRecoveryPath(logEntry.getPath()));

boolean foundFinish = false;
// Path::getName compares the last component of each Path value. In this case, the last
Expand Down
3 changes: 1 addition & 2 deletions test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.accumulo.test.functional.ReadWriteIT;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -226,7 +225,7 @@ private boolean logSortingCompleted(AccumuloClient c, TableId tableId) throws Ex
for (LogEntry walog : tm.getLogs()) {
String sortId = walog.getUniqueID().toString();
String filename = walog.getPath();
String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString();
String dest = RecoveryPath.getRecoveryPath(filename);

if (ctx.getZooCache().get(Constants.ZRECOVERY + "/" + sortId) != null
|| !ctx.getVolumeManager().exists(SortedLogState.getFinishedMarkerPath(dest))) {
Expand Down