Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -57,7 +56,7 @@

import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.fail;

/** Test utilities. */
public class TestUtils {
Expand Down Expand Up @@ -167,24 +166,6 @@ private static boolean hasMetadata(Path file) {
}
}

/**
* @deprecated please use {@link
* org.apache.flink.runtime.testutils.CommonTestUtils#waitForCheckpoint(JobID, MiniCluster,
* Deadline)} which is less prone to {@link NoSuchFileException} and IO-intensive.
*/
@Deprecated
public static void waitUntilExternalizedCheckpointCreated(File checkpointDir)
throws InterruptedException, IOException {
while (true) {
Thread.sleep(50);
Optional<File> externalizedCheckpoint =
getMostRecentCompletedCheckpointMaybe(checkpointDir);
if (externalizedCheckpoint.isPresent()) {
break;
}
}
}

public static void waitUntilJobCanceled(JobID jobId, ClusterClient<?> client)
throws ExecutionException, InterruptedException {
while (client.getJobStatus(jobId).get() != JobStatus.CANCELED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,60 +24,50 @@
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Tests cases where accumulators: a) throw errors during runtime b) are not compatible with
* existing accumulator.
*/
public class AccumulatorErrorITCase extends TestLogger {
@ExtendWith(TestLoggerExtension.class)
class AccumulatorErrorITCase {
private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(3)
.build());

public static Configuration getConfiguration() {
Configuration config = new Configuration();
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("12m"));
return config;
}

@Test
public void testFaultyAccumulator() throws Exception {
void testFaultyAccumulator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Test Exception forwarding with faulty Accumulator implementation
env.fromSequence(0, 10000)
.map(new FaultyAccumulatorUsingMapper())
.sinkTo(new DiscardingSink<>());

assertAccumulatorsShouldFail(env.execute());
assertAccumulatorsShouldFail(env.execute(), FaultyCloneAccumulator.class.getName());
}

@Test
public void testInvalidTypeAccumulator() throws Exception {
void testInvalidTypeAccumulator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Test Exception forwarding with faulty Accumulator implementation
Expand All @@ -86,24 +76,21 @@ public void testInvalidTypeAccumulator() throws Exception {
.map(new IncompatibleAccumulatorTypesMapper2())
.sinkTo(new DiscardingSink<>());

try {
env.execute();
fail("Should have failed.");
} catch (JobExecutionException e) {
assertTrue(findThrowable(e, UnsupportedOperationException.class).isPresent());
}
assertThatThrownBy(env::execute)
.cause()
.hasCauseInstanceOf(UnsupportedOperationException.class);
}

@Test
public void testFaultyMergeAccumulator() throws Exception {
void testFaultyMergeAccumulator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Test Exception forwarding with faulty Accumulator implementation
env.fromSequence(0, 10000)
.map(new FaultyMergeAccumulatorUsingMapper())
.sinkTo(new DiscardingSink<>());

assertAccumulatorsShouldFail(env.execute());
assertAccumulatorsShouldFail(env.execute(), FaultyMergeAccumulator.class.getName());
}

/* testFaultyAccumulator */
Expand All @@ -112,13 +99,13 @@ private static class FaultyAccumulatorUsingMapper extends RichMapFunction<Long,
private static final long serialVersionUID = 42;

@Override
public void open(OpenContext openContext) throws Exception {
public void open(OpenContext openContext) {
getRuntimeContext()
.addAccumulator(FAULTY_CLONE_ACCUMULATOR, new FaultyCloneAccumulator());
}

@Override
public Long map(Long value) throws Exception {
public Long map(Long value) {
return -1L;
}
}
Expand All @@ -128,7 +115,7 @@ private static class FaultyCloneAccumulator extends LongCounter {

@Override
public LongCounter clone() {
throw new CustomException();
throw new CustomException(FaultyCloneAccumulator.class.getName());
}
}

Expand All @@ -138,12 +125,12 @@ private static class IncompatibleAccumulatorTypesMapper extends RichMapFunction<
private static final long serialVersionUID = 42;

@Override
public void open(OpenContext openContext) throws Exception {
public void open(OpenContext openContext) {
getRuntimeContext().addAccumulator(INCOMPATIBLE_ACCUMULATORS_NAME, new LongCounter());
}

@Override
public Long map(Long value) throws Exception {
public Long map(Long value) {
return -1L;
}
}
Expand All @@ -152,12 +139,12 @@ private static class IncompatibleAccumulatorTypesMapper2 extends RichMapFunction
private static final long serialVersionUID = 42;

@Override
public void open(OpenContext openContext) throws Exception {
public void open(OpenContext openContext) {
getRuntimeContext().addAccumulator(INCOMPATIBLE_ACCUMULATORS_NAME, new DoubleCounter());
}

@Override
public Long map(Long value) throws Exception {
public Long map(Long value) {
return -1L;
}
}
Expand All @@ -167,13 +154,13 @@ private static class FaultyMergeAccumulatorUsingMapper extends RichMapFunction<L
private static final long serialVersionUID = 42;

@Override
public void open(OpenContext openContext) throws Exception {
public void open(OpenContext openContext) {
getRuntimeContext()
.addAccumulator(FAULTY_MERGE_ACCUMULATOR, new FaultyMergeAccumulator());
}

@Override
public Long map(Long value) throws Exception {
public Long map(Long value) {
return -1L;
}
}
Expand All @@ -183,7 +170,7 @@ private static class FaultyMergeAccumulator extends LongCounter {

@Override
public void merge(Accumulator<Long, Long> other) {
throw new CustomException();
throw new CustomException(FaultyMergeAccumulator.class.getName());
}

@Override
Expand All @@ -194,14 +181,18 @@ public LongCounter clone() {

private static class CustomException extends RuntimeException {
private static final long serialVersionUID = 42;
}

private static void assertAccumulatorsShouldFail(JobExecutionResult result) {
try {
result.getAllAccumulatorResults();
fail("Should have failed");
} catch (Exception ex) {
assertTrue(findThrowable(ex, CustomException.class).isPresent());
private CustomException(String message) {
super(message);
}
}

private static void assertAccumulatorsShouldFail(
JobExecutionResult result, String expectedMessage) {
assertThatThrownBy(result::getAllAccumulatorResults)
.cause()
.hasCauseInstanceOf(CustomException.class)
.cause()
.hasMessage(expectedMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@
import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
import org.apache.flink.streaming.api.legacy.io.TextOutputFormat;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;

import org.junit.Assert;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Test for the basic functionality of accumulators. We cannot test all different kinds of plans
Expand All @@ -58,8 +57,7 @@
* <p>TODO Test conflict when different UDFs write to accumulator with same name but with different
* type. The conflict will occur in JobManager while merging.
*/
@SuppressWarnings("serial")
public class AccumulatorITCase extends JavaProgramTestBaseJUnit4 {
class AccumulatorITCase extends JavaProgramTestBase {

private static final String INPUT = "one\n" + "two two\n" + "three three three\n";
private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
Expand All @@ -84,28 +82,24 @@ protected void postSubmit() throws Exception {
JobExecutionResult res = this.result;
System.out.println(AccumulatorHelper.getResultsFormatted(res.getAllAccumulatorResults()));

Assert.assertEquals(Integer.valueOf(3), res.getAccumulatorResult("num-lines"));
assertThat(res.<Integer>getAccumulatorResult("num-lines")).isEqualTo(3);

Assert.assertEquals(
Double.valueOf(getParallelism()), res.getAccumulatorResult("open-close-counter"));
assertThat(res.<Double>getAccumulatorResult("open-close-counter"))
.isEqualTo(getParallelism());

// Test histogram (words per line distribution)
Map<Integer, Integer> dist = new HashMap<>();
dist.put(1, 1);
dist.put(2, 1);
dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
assertThat(res.<Map<Integer, Integer>>getAccumulatorResult("words-per-line"))
.containsExactlyInAnyOrderEntriesOf(
Map.ofEntries(Map.entry(1, 1), Map.entry(2, 1), Map.entry(3, 1)));

// Test distinct words (custom accumulator)
Set<StringValue> distinctWords = new HashSet<>();
distinctWords.add(new StringValue("one"));
distinctWords.add(new StringValue("two"));
distinctWords.add(new StringValue("three"));
Assert.assertEquals(distinctWords, res.getAccumulatorResult("distinct-words"));
assertThat(res.<Set<StringValue>>getAccumulatorResult("distinct-words"))
.containsExactlyInAnyOrder(
new StringValue("one"), new StringValue("two"), new StringValue("three"));
}

@Override
protected void testProgram() throws Exception {
protected JobExecutionResult testProgram() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> input =
Expand Down Expand Up @@ -137,6 +131,7 @@ public String map(Tuple2<String, Integer> value) throws Exception {
new TextOutputFormat<>(new Path(resultPath))));

this.result = env.execute();
return this.result;
}

private static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
Expand Down Expand Up @@ -167,25 +162,15 @@ public void open(OpenContext openContext) {
// Create counter and test increment
IntCounter simpleCounter = getRuntimeContext().getIntCounter("simple-counter");
simpleCounter.add(1);
Assert.assertEquals(simpleCounter.getLocalValue().intValue(), 1);
assertThat(simpleCounter.getLocalValue().intValue()).isOne();

// Test if we get the same counter
IntCounter simpleCounter2 = getRuntimeContext().getIntCounter("simple-counter");
Assert.assertEquals(simpleCounter.getLocalValue(), simpleCounter2.getLocalValue());
assertThat(simpleCounter2.getLocalValue()).isEqualTo(simpleCounter.getLocalValue());

// Should fail if we request it with different type
try {
@SuppressWarnings("unused")
DoubleCounter simpleCounter3 =
getRuntimeContext().getDoubleCounter("simple-counter");
// DoubleSumAggregator longAggregator3 = (DoubleSumAggregator)
// getRuntimeContext().getAggregator("custom",
// DoubleSumAggregator.class);
Assert.fail(
"Should not be able to obtain previously created counter with different type");
} catch (UnsupportedOperationException ex) {
// expected!
}
assertThatThrownBy(() -> getRuntimeContext().getDoubleCounter("simple-counter"))
.isInstanceOf(UnsupportedOperationException.class);

// Test counter used in open() and closed()
this.openCloseCounter.add(0.5);
Expand All @@ -208,7 +193,7 @@ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
public void close() throws Exception {
// Test counter used in open and close only
this.openCloseCounter.add(0.5);
Assert.assertEquals(1, this.openCloseCounter.getLocalValue().intValue());
assertThat(this.openCloseCounter.getLocalValue().intValue()).isEqualTo(1);
}
}

Expand Down Expand Up @@ -253,7 +238,7 @@ private void reduceInternal(
}

/** Custom accumulator. */
public static class SetAccumulator<T> implements Accumulator<T, HashSet<T>> {
private static class SetAccumulator<T> implements Accumulator<T, HashSet<T>> {

private static final long serialVersionUID = 1L;

Expand Down
Loading